Whamcloud - gitweb
d835708f6bc21cb9e4b37e6b444186104c9d4fd6
[fs/lustre-release.git] / lustre / osd-zfs / osd_scrub.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, Intel Corporation.
24  */
25 /*
26  * lustre/osd-zfs/osd_scrub.c
27  *
28  * Top-level entry points into osd module
29  *
30  * The OI scrub is used for rebuilding Object Index files when restores MDT from
31  * file-level backup.
32  *
33  * The otable based iterator scans ZFS objects to feed up layer LFSCK.
34  *
35  * Author: Fan Yong <fan.yong@intel.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_LFSCK
39
40 #include <linux/kthread.h>
41 #include <uapi/linux/lustre/lustre_idl.h>
42 #include <lustre_disk.h>
43 #include <dt_object.h>
44 #include <linux/xattr.h>
45 #include <lustre_scrub.h>
46 #include <obd_class.h>
47 #include <lustre_nodemap.h>
48 #include <sys/dsl_dataset.h>
49 #include <sys/zap_impl.h>
50 #include <sys/zap.h>
51 #include <sys/zap_leaf.h>
52
53 #include "osd_internal.h"
54
55 #define OSD_OTABLE_MAX_HASH             ((1ULL << 48) - 1)
56 #define OTABLE_PREFETCH                 256
57
58 #define DTO_INDEX_INSERT                1
59 #define DTO_INDEX_DELETE                2
60 #define DTO_INDEX_UPDATE                3
61
62 static inline bool osd_scrub_has_window(struct osd_otable_it *it)
63 {
64         return it->ooi_prefetched < OTABLE_PREFETCH;
65 }
66
67 /**
68  * update/insert/delete the specified OI mapping (@fid @id) according to the ops
69  *
70  * \retval   1, changed nothing
71  * \retval   0, changed successfully
72  * \retval -ve, on error
73  */
74 static int osd_scrub_refresh_mapping(const struct lu_env *env,
75                                      struct osd_device *dev,
76                                      const struct lu_fid *fid,
77                                      uint64_t oid, int ops,
78                                      bool force, const char *name)
79 {
80         struct osd_thread_info *info = osd_oti_get(env);
81         struct zpl_direntry *zde = &info->oti_zde.lzd_reg;
82         char *buf = info->oti_str;
83         dmu_tx_t *tx = NULL;
84         dnode_t *dn = NULL;
85         uint64_t zapid;
86         int rc;
87         ENTRY;
88
89         if (dev->od_scrub.os_file.sf_param & SP_DRYRUN && !force)
90                 GOTO(log, rc = 0);
91
92         tx = dmu_tx_create(dev->od_os);
93         if (!tx)
94                 GOTO(log, rc = -ENOMEM);
95
96         zapid = osd_get_name_n_idx(env, dev, fid, buf,
97                                    sizeof(info->oti_str), &dn);
98         osd_tx_hold_zap(tx, zapid, dn,
99                         ops == DTO_INDEX_INSERT ? TRUE : FALSE, NULL);
100         rc = -dmu_tx_assign(tx, TXG_WAIT);
101         if (rc) {
102                 dmu_tx_abort(tx);
103                 GOTO(log, rc);
104         }
105
106         switch (ops) {
107         case DTO_INDEX_UPDATE:
108                 zde->zde_pad = 0;
109                 zde->zde_dnode = oid;
110                 zde->zde_type = 0; /* The type in OI mapping is useless. */
111                 rc = -zap_update(dev->od_os, zapid, buf, 8, sizeof(*zde) / 8,
112                                  zde, tx);
113                 if (unlikely(rc == -ENOENT)) {
114                         /* Some unlink thread may removed the OI mapping. */
115                         rc = 1;
116                 }
117                 break;
118         case DTO_INDEX_INSERT:
119                 zde->zde_pad = 0;
120                 zde->zde_dnode = oid;
121                 zde->zde_type = 0; /* The type in OI mapping is useless. */
122                 rc = osd_zap_add(dev, zapid, dn, buf, 8, sizeof(*zde) / 8,
123                                  zde, tx);
124                 if (unlikely(rc == -EEXIST))
125                         rc = 1;
126                 break;
127         case DTO_INDEX_DELETE:
128                 rc = osd_zap_remove(dev, zapid, dn, buf, tx);
129                 if (rc == -ENOENT) {
130                         /* It is normal that the unlink thread has removed the
131                          * OI mapping already. */
132                         rc = 1;
133                 }
134                 break;
135         default:
136                 LASSERTF(0, "Unexpected ops %d\n", ops);
137                 rc = -EINVAL;
138                 break;
139         }
140
141         dmu_tx_commit(tx);
142         GOTO(log, rc);
143
144 log:
145         CDEBUG(D_LFSCK, "%s: refresh OI map for scrub, op %d, force %s, "
146                DFID" => %llu (%s): rc = %d\n", osd_name(dev), ops,
147                force ? "yes" : "no", PFID(fid), oid, name ? name : "null", rc);
148
149         return rc;
150 }
151
152 static int
153 osd_scrub_check_update(const struct lu_env *env, struct osd_device *dev,
154                        const struct lu_fid *fid, uint64_t oid, int val)
155 {
156         struct lustre_scrub *scrub = &dev->od_scrub;
157         struct scrub_file *sf = &scrub->os_file;
158         struct osd_inconsistent_item *oii = NULL;
159         nvlist_t *nvbuf = NULL;
160         dnode_t *dn = NULL;
161         uint64_t oid2;
162         int ops = DTO_INDEX_UPDATE;
163         int rc;
164         ENTRY;
165
166         down_write(&scrub->os_rwsem);
167         scrub->os_new_checked++;
168         if (val < 0)
169                 GOTO(out, rc = val);
170
171         if (scrub->os_in_prior)
172                 oii = list_entry(scrub->os_inconsistent_items.next,
173                                  struct osd_inconsistent_item, oii_list);
174
175         if (oid < sf->sf_pos_latest_start && !oii)
176                 GOTO(out, rc = 0);
177
178         if (oii && oii->oii_insert) {
179                 ops = DTO_INDEX_INSERT;
180                 goto zget;
181         }
182
183         rc = osd_fid_lookup(env, dev, fid, &oid2);
184         if (rc) {
185                 if (rc != -ENOENT)
186                         GOTO(out, rc);
187
188                 ops = DTO_INDEX_INSERT;
189
190 zget:
191                 rc = __osd_obj2dnode(dev->od_os, oid, &dn);
192                 if (rc) {
193                         /* Someone removed the object by race. */
194                         if (rc == -ENOENT || rc == -EEXIST)
195                                 rc = 0;
196                         GOTO(out, rc);
197                 }
198
199                 scrub->os_full_speed = 1;
200                 sf->sf_flags |= SF_INCONSISTENT;
201         } else if (oid == oid2) {
202                 GOTO(out, rc = 0);
203         } else {
204                 struct lustre_mdt_attrs *lma = NULL;
205                 int size;
206
207                 rc = __osd_xattr_load_by_oid(dev, oid2, &nvbuf);
208                 if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
209                         goto update;
210                 if (rc)
211                         GOTO(out, rc);
212
213                 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
214                                                (uchar_t **)&lma, &size);
215                 if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
216                         goto update;
217                 if (rc)
218                         GOTO(out, rc);
219
220                 lustre_lma_swab(lma);
221                 if (unlikely(lu_fid_eq(&lma->lma_self_fid, fid))) {
222                         CDEBUG(D_LFSCK, "%s: the FID "DFID" is used by "
223                                "two objects: %llu and %llu (in OI)\n",
224                                osd_name(dev), PFID(fid), oid, oid2);
225
226                         GOTO(out, rc = -EEXIST);
227                 }
228
229 update:
230                 scrub->os_full_speed = 1;
231                 sf->sf_flags |= SF_INCONSISTENT;
232         }
233
234         rc = osd_scrub_refresh_mapping(env, dev, fid, oid, ops, false, NULL);
235         if (!rc) {
236                 if (scrub->os_in_prior)
237                         sf->sf_items_updated_prior++;
238                 else
239                         sf->sf_items_updated++;
240         }
241
242         GOTO(out, rc);
243
244 out:
245         if (nvbuf)
246                 nvlist_free(nvbuf);
247
248         if (rc < 0) {
249                 sf->sf_items_failed++;
250                 if (sf->sf_pos_first_inconsistent == 0 ||
251                     sf->sf_pos_first_inconsistent > oid)
252                         sf->sf_pos_first_inconsistent = oid;
253         } else {
254                 rc = 0;
255         }
256
257         /* There may be conflict unlink during the OI scrub,
258          * if happend, then remove the new added OI mapping. */
259         if (ops == DTO_INDEX_INSERT && dn && dn->dn_free_txg)
260                 osd_scrub_refresh_mapping(env, dev, fid, oid,
261                                           DTO_INDEX_DELETE, false, NULL);
262         up_write(&scrub->os_rwsem);
263
264         if (dn)
265                 osd_dnode_rele(dn);
266
267         if (oii) {
268                 spin_lock(&scrub->os_lock);
269                 if (likely(!list_empty(&oii->oii_list)))
270                         list_del(&oii->oii_list);
271                 spin_unlock(&scrub->os_lock);
272                 OBD_FREE_PTR(oii);
273         }
274
275         RETURN(sf->sf_param & SP_FAILOUT ? rc : 0);
276 }
277
278 static int osd_scrub_prep(const struct lu_env *env, struct osd_device *dev)
279 {
280         struct lustre_scrub *scrub = &dev->od_scrub;
281         struct ptlrpc_thread *thread = &scrub->os_thread;
282         struct scrub_file *sf = &scrub->os_file;
283         __u32 flags = scrub->os_start_flags;
284         int rc;
285         bool drop_dryrun = false;
286         ENTRY;
287
288         CDEBUG(D_LFSCK, "%s: OI scrub prep, flags = 0x%x\n",
289                scrub->os_name, flags);
290
291         down_write(&scrub->os_rwsem);
292         if (flags & SS_SET_FAILOUT)
293                 sf->sf_param |= SP_FAILOUT;
294         else if (flags & SS_CLEAR_FAILOUT)
295                 sf->sf_param &= ~SP_FAILOUT;
296
297         if (flags & SS_SET_DRYRUN) {
298                 sf->sf_param |= SP_DRYRUN;
299         } else if (flags & SS_CLEAR_DRYRUN && sf->sf_param & SP_DRYRUN) {
300                 sf->sf_param &= ~SP_DRYRUN;
301                 drop_dryrun = true;
302         }
303
304         if (flags & SS_RESET)
305                 scrub_file_reset(scrub, dev->od_uuid, 0);
306
307         scrub->os_partial_scan = 0;
308         if (flags & SS_AUTO_FULL) {
309                 scrub->os_full_speed = 1;
310                 sf->sf_flags |= SF_AUTO;
311         } else if (sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
312                                    SF_UPGRADE)) {
313                 scrub->os_full_speed = 1;
314         } else {
315                 scrub->os_full_speed = 0;
316         }
317
318         spin_lock(&scrub->os_lock);
319         scrub->os_in_prior = 0;
320         scrub->os_waiting = 0;
321         scrub->os_paused = 0;
322         scrub->os_in_join = 0;
323         scrub->os_full_scrub = 0;
324         spin_unlock(&scrub->os_lock);
325         scrub->os_new_checked = 0;
326         if (drop_dryrun && sf->sf_pos_first_inconsistent != 0)
327                 sf->sf_pos_latest_start = sf->sf_pos_first_inconsistent;
328         else if (sf->sf_pos_last_checkpoint != 0)
329                 sf->sf_pos_latest_start = sf->sf_pos_last_checkpoint + 1;
330         else
331                 sf->sf_pos_latest_start = 1;
332
333         scrub->os_pos_current = sf->sf_pos_latest_start;
334         sf->sf_status = SS_SCANNING;
335         sf->sf_time_latest_start = ktime_get_real_seconds();
336         sf->sf_time_last_checkpoint = sf->sf_time_latest_start;
337         sf->sf_pos_last_checkpoint = sf->sf_pos_latest_start - 1;
338         rc = scrub_file_store(env, scrub);
339         if (!rc) {
340                 spin_lock(&scrub->os_lock);
341                 thread_set_flags(thread, SVC_RUNNING);
342                 spin_unlock(&scrub->os_lock);
343                 wake_up_all(&thread->t_ctl_waitq);
344         }
345         up_write(&scrub->os_rwsem);
346
347         RETURN(rc);
348 }
349
350 static int osd_scrub_post(const struct lu_env *env, struct osd_device *dev,
351                           int result)
352 {
353         struct lustre_scrub *scrub = &dev->od_scrub;
354         struct scrub_file *sf = &scrub->os_file;
355         int rc;
356         ENTRY;
357
358         CDEBUG(D_LFSCK, "%s: OI scrub post with result = %d\n",
359                scrub->os_name, result);
360
361         down_write(&scrub->os_rwsem);
362         spin_lock(&scrub->os_lock);
363         thread_set_flags(&scrub->os_thread, SVC_STOPPING);
364         spin_unlock(&scrub->os_lock);
365         if (scrub->os_new_checked > 0) {
366                 sf->sf_items_checked += scrub->os_new_checked;
367                 scrub->os_new_checked = 0;
368                 sf->sf_pos_last_checkpoint = scrub->os_pos_current;
369         }
370         sf->sf_time_last_checkpoint = ktime_get_real_seconds();
371         if (result > 0) {
372                 sf->sf_status = SS_COMPLETED;
373                 if (!(sf->sf_param & SP_DRYRUN)) {
374                         memset(sf->sf_oi_bitmap, 0, SCRUB_OI_BITMAP_SIZE);
375                         sf->sf_flags &= ~(SF_RECREATED | SF_INCONSISTENT |
376                                           SF_UPGRADE | SF_AUTO);
377                 }
378                 sf->sf_time_last_complete = sf->sf_time_last_checkpoint;
379                 sf->sf_success_count++;
380         } else if (result == 0) {
381                 if (scrub->os_paused)
382                         sf->sf_status = SS_PAUSED;
383                 else
384                         sf->sf_status = SS_STOPPED;
385         } else {
386                 sf->sf_status = SS_FAILED;
387         }
388         sf->sf_run_time += ktime_get_seconds() -
389                            scrub->os_time_last_checkpoint;
390
391         rc = scrub_file_store(env, scrub);
392         up_write(&scrub->os_rwsem);
393
394         RETURN(rc < 0 ? rc : result);
395 }
396
397 /* iteration engine */
398
399 static inline int
400 osd_scrub_wakeup(struct lustre_scrub *scrub, struct osd_otable_it *it)
401 {
402         spin_lock(&scrub->os_lock);
403         if (osd_scrub_has_window(it) ||
404             !list_empty(&scrub->os_inconsistent_items) ||
405             it->ooi_waiting || !thread_is_running(&scrub->os_thread))
406                 scrub->os_waiting = 0;
407         else
408                 scrub->os_waiting = 1;
409         spin_unlock(&scrub->os_lock);
410
411         return !scrub->os_waiting;
412 }
413
414 static int osd_scrub_next(const struct lu_env *env, struct osd_device *dev,
415                           struct lu_fid *fid, uint64_t *oid)
416 {
417         struct lustre_scrub *scrub = &dev->od_scrub;
418         struct ptlrpc_thread *thread = &scrub->os_thread;
419         struct osd_otable_it *it = dev->od_otable_it;
420         struct lustre_mdt_attrs *lma = NULL;
421         nvlist_t *nvbuf = NULL;
422         int size = 0;
423         int rc = 0;
424         ENTRY;
425
426         if (OBD_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_DELAY) && cfs_fail_val > 0) {
427                 wait_event_idle_timeout(
428                         thread->t_ctl_waitq,
429                         !list_empty(&scrub->os_inconsistent_items) ||
430                         !thread_is_running(thread),
431                         cfs_time_seconds(cfs_fail_val));
432
433                 if (unlikely(!thread_is_running(thread)))
434                         RETURN(SCRUB_NEXT_EXIT);
435         }
436
437         if (OBD_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_CRASH)) {
438                 spin_lock(&scrub->os_lock);
439                 thread_set_flags(thread, SVC_STOPPING);
440                 spin_unlock(&scrub->os_lock);
441                 RETURN(SCRUB_NEXT_CRASH);
442         }
443
444         if (OBD_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_FATAL))
445                 RETURN(SCRUB_NEXT_FATAL);
446
447 again:
448         if (nvbuf) {
449                 nvlist_free(nvbuf);
450                 nvbuf = NULL;
451                 lma = NULL;
452         }
453
454         if (!list_empty(&scrub->os_inconsistent_items)) {
455                 spin_lock(&scrub->os_lock);
456                 if (likely(!list_empty(&scrub->os_inconsistent_items))) {
457                         struct osd_inconsistent_item *oii;
458
459                         oii = list_entry(scrub->os_inconsistent_items.next,
460                                 struct osd_inconsistent_item, oii_list);
461                         *fid = oii->oii_cache.oic_fid;
462                         *oid = oii->oii_cache.oic_dnode;
463                         scrub->os_in_prior = 1;
464                         spin_unlock(&scrub->os_lock);
465
466                         GOTO(out, rc = 0);
467                 }
468                 spin_unlock(&scrub->os_lock);
469         }
470
471         if (!scrub->os_full_speed && !osd_scrub_has_window(it))
472                 wait_event_idle(thread->t_ctl_waitq,
473                                 osd_scrub_wakeup(scrub, it));
474
475         if (unlikely(!thread_is_running(thread)))
476                 GOTO(out, rc = SCRUB_NEXT_EXIT);
477
478         rc = -dmu_object_next(dev->od_os, &scrub->os_pos_current, B_FALSE, 0);
479         if (rc)
480                 GOTO(out, rc = (rc == -ESRCH ? SCRUB_NEXT_BREAK : rc));
481
482         rc = __osd_xattr_load_by_oid(dev, scrub->os_pos_current, &nvbuf);
483         if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
484                 goto again;
485
486         if (rc)
487                 GOTO(out, rc);
488
489         LASSERT(nvbuf != NULL);
490         rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
491                                        (uchar_t **)&lma, &size);
492         if (!rc) {
493                 lustre_lma_swab(lma);
494                 if (likely(!(lma->lma_compat & LMAC_NOT_IN_OI) &&
495                            !(lma->lma_incompat & LMAI_AGENT))) {
496                         *fid = lma->lma_self_fid;
497                         *oid = scrub->os_pos_current;
498
499                         GOTO(out, rc = 0);
500                 }
501         }
502
503         if (!scrub->os_full_speed) {
504                 spin_lock(&scrub->os_lock);
505                 it->ooi_prefetched++;
506                 if (it->ooi_waiting) {
507                         it->ooi_waiting = 0;
508                         wake_up_all(&thread->t_ctl_waitq);
509                 }
510                 spin_unlock(&scrub->os_lock);
511         }
512
513         goto again;
514
515 out:
516         if (nvbuf)
517                 nvlist_free(nvbuf);
518
519         return rc;
520 }
521
522 static int osd_scrub_exec(const struct lu_env *env, struct osd_device *dev,
523                           const struct lu_fid *fid, uint64_t oid, int rc)
524 {
525         struct lustre_scrub *scrub = &dev->od_scrub;
526         struct ptlrpc_thread *thread = &scrub->os_thread;
527         struct osd_otable_it *it = dev->od_otable_it;
528
529         rc = osd_scrub_check_update(env, dev, fid, oid, rc);
530         if (!scrub->os_in_prior) {
531                 if (!scrub->os_full_speed) {
532                         spin_lock(&scrub->os_lock);
533                         it->ooi_prefetched++;
534                         if (it->ooi_waiting) {
535                                 it->ooi_waiting = 0;
536                                 wake_up_all(&thread->t_ctl_waitq);
537                         }
538                         spin_unlock(&scrub->os_lock);
539                 }
540         } else {
541                 scrub->os_in_prior = 0;
542         }
543
544         if (rc)
545                 return rc;
546
547         rc = scrub_checkpoint(env, scrub);
548         if (rc) {
549                 CDEBUG(D_LFSCK, "%s: fail to checkpoint, pos = %llu: "
550                        "rc = %d\n", scrub->os_name, scrub->os_pos_current, rc);
551                 /* Continue, as long as the scrub itself can go ahead. */
552         }
553
554         return 0;
555 }
556
557 static int osd_scrub_main(void *args)
558 {
559         struct lu_env env;
560         struct osd_device *dev = (struct osd_device *)args;
561         struct lustre_scrub *scrub = &dev->od_scrub;
562         struct ptlrpc_thread *thread = &scrub->os_thread;
563         struct lu_fid *fid;
564         uint64_t oid;
565         int rc = 0;
566         ENTRY;
567
568         rc = lu_env_init(&env, LCT_LOCAL | LCT_DT_THREAD);
569         if (rc) {
570                 CDEBUG(D_LFSCK, "%s: OI scrub fail to init env: rc = %d\n",
571                        scrub->os_name, rc);
572                 GOTO(noenv, rc);
573         }
574
575         rc = osd_scrub_prep(&env, dev);
576         if (rc) {
577                 CDEBUG(D_LFSCK, "%s: OI scrub fail to scrub prep: rc = %d\n",
578                        scrub->os_name, rc);
579                 GOTO(out, rc);
580         }
581
582         if (!scrub->os_full_speed) {
583                 struct osd_otable_it *it = dev->od_otable_it;
584
585                 wait_event_idle(thread->t_ctl_waitq,
586                                 it->ooi_user_ready ||
587                                 !thread_is_running(thread));
588
589                 if (unlikely(!thread_is_running(thread)))
590                         GOTO(post, rc = 0);
591
592                 scrub->os_pos_current = it->ooi_pos;
593         }
594
595         CDEBUG(D_LFSCK, "%s: OI scrub start, flags = 0x%x, pos = %llu\n",
596                scrub->os_name, scrub->os_start_flags,
597                scrub->os_pos_current);
598
599         fid = &osd_oti_get(&env)->oti_fid;
600         while (!rc && thread_is_running(thread)) {
601                 rc = osd_scrub_next(&env, dev, fid, &oid);
602                 switch (rc) {
603                 case SCRUB_NEXT_EXIT:
604                         GOTO(post, rc = 0);
605                 case SCRUB_NEXT_CRASH:
606                         spin_lock(&scrub->os_lock);
607                         thread_set_flags(&scrub->os_thread, SVC_STOPPING);
608                         spin_unlock(&scrub->os_lock);
609                         GOTO(out, rc = -EINVAL);
610                 case SCRUB_NEXT_FATAL:
611                         GOTO(post, rc = -EINVAL);
612                 case SCRUB_NEXT_BREAK:
613                         GOTO(post, rc = 1);
614                 }
615
616                 rc = osd_scrub_exec(&env, dev, fid, oid, rc);
617         }
618
619         GOTO(post, rc);
620
621 post:
622         rc = osd_scrub_post(&env, dev, rc);
623         CDEBUG(D_LFSCK, "%s: OI scrub: stop, pos = %llu: rc = %d\n",
624                scrub->os_name, scrub->os_pos_current, rc);
625
626 out:
627         while (!list_empty(&scrub->os_inconsistent_items)) {
628                 struct osd_inconsistent_item *oii;
629
630                 oii = list_entry(scrub->os_inconsistent_items.next,
631                                  struct osd_inconsistent_item, oii_list);
632                 list_del_init(&oii->oii_list);
633                 OBD_FREE_PTR(oii);
634         }
635
636         lu_env_fini(&env);
637
638 noenv:
639         spin_lock(&scrub->os_lock);
640         thread_set_flags(thread, SVC_STOPPED);
641         wake_up_all(&thread->t_ctl_waitq);
642         spin_unlock(&scrub->os_lock);
643         return rc;
644 }
645
646 /* initial OI scrub */
647
648 struct osd_lf_map;
649
650 typedef int (*handle_dirent_t)(const struct lu_env *, struct osd_device *,
651                                const char *, uint64_t, uint64_t,
652                                enum osd_lf_flags, bool);
653 static int osd_ios_varfid_hd(const struct lu_env *, struct osd_device *,
654                              const char *, uint64_t, uint64_t,
655                              enum osd_lf_flags, bool);
656 static int osd_ios_uld_hd(const struct lu_env *, struct osd_device *,
657                           const char *, uint64_t, uint64_t,
658                           enum osd_lf_flags, bool);
659
660 typedef int (*scan_dir_t)(const struct lu_env *, struct osd_device *,
661                           uint64_t, handle_dirent_t, enum osd_lf_flags);
662 static int osd_ios_general_sd(const struct lu_env *, struct osd_device *,
663                               uint64_t, handle_dirent_t, enum osd_lf_flags);
664 static int osd_ios_ROOT_sd(const struct lu_env *, struct osd_device *,
665                            uint64_t, handle_dirent_t, enum osd_lf_flags);
666
667 struct osd_lf_map {
668         char                    *olm_name;
669         struct lu_fid            olm_fid;
670         enum osd_lf_flags        olm_flags;
671         scan_dir_t               olm_scan_dir;
672         handle_dirent_t          olm_handle_dirent;
673 };
674
675 /* Add the new introduced local files in the list in the future. */
676 static const struct osd_lf_map osd_lf_maps[] = {
677         /* CONFIGS */
678         {
679                 .olm_name               = MOUNT_CONFIGS_DIR,
680                 .olm_fid                = {
681                         .f_seq  = FID_SEQ_LOCAL_FILE,
682                         .f_oid  = MGS_CONFIGS_OID,
683                 },
684                 .olm_flags              = OLF_SCAN_SUBITEMS,
685                 .olm_scan_dir           = osd_ios_general_sd,
686                 .olm_handle_dirent      = osd_ios_varfid_hd,
687         },
688
689         /* NIDTBL_VERSIONS */
690         {
691                 .olm_name               = MGS_NIDTBL_DIR,
692                 .olm_flags              = OLF_SCAN_SUBITEMS,
693                 .olm_scan_dir           = osd_ios_general_sd,
694                 .olm_handle_dirent      = osd_ios_varfid_hd,
695         },
696
697         /* PENDING */
698         {
699                 .olm_name               = MDT_ORPHAN_DIR,
700         },
701
702         /* ROOT */
703         {
704                 .olm_name               = "ROOT",
705                 .olm_fid                = {
706                         .f_seq  = FID_SEQ_ROOT,
707                         .f_oid  = FID_OID_ROOT,
708                 },
709                 .olm_flags              = OLF_SCAN_SUBITEMS,
710                 .olm_scan_dir           = osd_ios_ROOT_sd,
711         },
712
713         /* fld */
714         {
715                 .olm_name               = "fld",
716                 .olm_fid                = {
717                         .f_seq  = FID_SEQ_LOCAL_FILE,
718                         .f_oid  = FLD_INDEX_OID,
719                 },
720         },
721
722         /* changelog_catalog */
723         {
724                 .olm_name               = CHANGELOG_CATALOG,
725         },
726
727         /* changelog_users */
728         {
729                 .olm_name               = CHANGELOG_USERS,
730         },
731
732         /* quota_master */
733         {
734                 .olm_name               = QMT_DIR,
735                 .olm_flags              = OLF_SCAN_SUBITEMS,
736                 .olm_scan_dir           = osd_ios_general_sd,
737                 .olm_handle_dirent      = osd_ios_varfid_hd,
738         },
739
740         /* quota_slave */
741         {
742                 .olm_name               = QSD_DIR,
743                 .olm_flags              = OLF_SCAN_SUBITEMS,
744                 .olm_scan_dir           = osd_ios_general_sd,
745                 .olm_handle_dirent      = osd_ios_varfid_hd,
746         },
747
748         /* LFSCK */
749         {
750                 .olm_name               = LFSCK_DIR,
751                 .olm_flags              = OLF_SCAN_SUBITEMS | OLF_NOT_BACKUP,
752                 .olm_scan_dir           = osd_ios_general_sd,
753                 .olm_handle_dirent      = osd_ios_varfid_hd,
754         },
755
756         /* lfsck_bookmark */
757         {
758                 .olm_name               = LFSCK_BOOKMARK,
759         },
760
761         /* lfsck_layout */
762         {
763                 .olm_name               = LFSCK_LAYOUT,
764         },
765
766         /* lfsck_namespace */
767         {
768                 .olm_name               = LFSCK_NAMESPACE,
769         },
770
771         /* OSP update logs update_log{_dir} use f_seq = FID_SEQ_UPDATE_LOG{_DIR}
772          * and f_oid = index for their log files.  See lu_update_log{_dir}_fid()
773          * for more details. */
774
775         /* update_log */
776         {
777                 .olm_name               = "update_log",
778                 .olm_fid                = {
779                         .f_seq  = FID_SEQ_UPDATE_LOG,
780                 },
781                 .olm_flags              = OLF_IDX_IN_FID,
782         },
783
784         /* update_log_dir */
785         {
786                 .olm_name               = "update_log_dir",
787                 .olm_fid        = {
788                         .f_seq  = FID_SEQ_UPDATE_LOG_DIR,
789                 },
790                 .olm_flags              = OLF_SCAN_SUBITEMS | OLF_IDX_IN_FID,
791                 .olm_scan_dir           = osd_ios_general_sd,
792                 .olm_handle_dirent      = osd_ios_uld_hd,
793         },
794
795         /* hsm_actions */
796         {
797                 .olm_name               = HSM_ACTIONS,
798         },
799
800         /* nodemap */
801         {
802                 .olm_name               = LUSTRE_NODEMAP_NAME,
803         },
804
805         /* index_backup */
806         {
807                 .olm_name               = INDEX_BACKUP_DIR,
808                 .olm_fid                = {
809                         .f_seq  = FID_SEQ_LOCAL_FILE,
810                         .f_oid  = INDEX_BACKUP_OID,
811                 },
812                 .olm_flags              = OLF_SCAN_SUBITEMS | OLF_NOT_BACKUP,
813                 .olm_scan_dir           = osd_ios_general_sd,
814                 .olm_handle_dirent      = osd_ios_varfid_hd,
815         },
816
817         {
818                 .olm_name               = NULL
819         }
820 };
821
822 /* Add the new introduced files under .lustre/ in the list in the future. */
823 static const struct osd_lf_map osd_dl_maps[] = {
824         /* .lustre/fid */
825         {
826                 .olm_name               = "fid",
827                 .olm_fid                = {
828                         .f_seq  = FID_SEQ_DOT_LUSTRE,
829                         .f_oid  = FID_OID_DOT_LUSTRE_OBF,
830                 },
831         },
832
833         /* .lustre/lost+found */
834         {
835                 .olm_name               = "lost+found",
836                 .olm_fid                = {
837                         .f_seq  = FID_SEQ_DOT_LUSTRE,
838                         .f_oid  = FID_OID_DOT_LUSTRE_LPF,
839                 },
840         },
841
842         {
843                 .olm_name               = NULL
844         }
845 };
846
847 struct osd_ios_item {
848         struct list_head        oii_list;
849         uint64_t                oii_parent;
850         enum osd_lf_flags       oii_flags;
851         scan_dir_t              oii_scan_dir;
852         handle_dirent_t         oii_handle_dirent;
853 };
854
855 static int osd_ios_new_item(struct osd_device *dev, uint64_t parent,
856                             enum osd_lf_flags flags, scan_dir_t scan_dir,
857                             handle_dirent_t handle_dirent)
858 {
859         struct osd_ios_item *item;
860
861         OBD_ALLOC_PTR(item);
862         if (!item) {
863                 CWARN("%s: initial OI scrub failed to add item for %llu\n",
864                       osd_name(dev), parent);
865                 return -ENOMEM;
866         }
867
868         INIT_LIST_HEAD(&item->oii_list);
869         item->oii_parent = parent;
870         item->oii_flags = flags;
871         item->oii_scan_dir = scan_dir;
872         item->oii_handle_dirent = handle_dirent;
873         list_add_tail(&item->oii_list, &dev->od_ios_list);
874
875         return 0;
876 }
877
878 static bool osd_index_need_recreate(const struct lu_env *env,
879                                     struct osd_device *dev, uint64_t oid)
880 {
881         struct osd_thread_info *info = osd_oti_get(env);
882         zap_attribute_t *za = &info->oti_za2;
883         zap_cursor_t *zc = &info->oti_zc2;
884         int rc;
885         ENTRY;
886
887         zap_cursor_init_serialized(zc, dev->od_os, oid, 0);
888         rc = -zap_cursor_retrieve(zc, za);
889         zap_cursor_fini(zc);
890         if (rc && rc != -ENOENT)
891                 RETURN(true);
892
893         RETURN(false);
894 }
895
896 static void osd_ios_index_register(const struct lu_env *env,
897                                    struct osd_device *osd,
898                                    const struct lu_fid *fid, uint64_t oid)
899 {
900         struct osd_thread_info *info = osd_oti_get(env);
901         zap_attribute_t *za = &info->oti_za2;
902         zap_cursor_t *zc = &info->oti_zc2;
903         struct zap_leaf_entry *le;
904         dnode_t *dn = NULL;
905         sa_handle_t *hdl;
906         __u64 mode = 0;
907         __u32 keysize = 0;
908         __u32 recsize = 0;
909         int rc;
910         ENTRY;
911
912         rc = __osd_obj2dnode(osd->od_os, oid, &dn);
913         if (rc == -EEXIST || rc == -ENOENT)
914                 RETURN_EXIT;
915
916         if (rc < 0)
917                 GOTO(log, rc);
918
919         if (!osd_object_is_zap(dn))
920                 GOTO(log, rc = 1);
921
922         rc = -sa_handle_get(osd->od_os, oid, NULL, SA_HDL_PRIVATE, &hdl);
923         if (rc)
924                 GOTO(log, rc);
925
926         rc = -sa_lookup(hdl, SA_ZPL_MODE(osd), &mode, sizeof(mode));
927         sa_handle_destroy(hdl);
928         if (rc)
929                 GOTO(log, rc);
930
931         if (!S_ISREG(mode))
932                 GOTO(log, rc = 1);
933
934         zap_cursor_init_serialized(zc, osd->od_os, oid, 0);
935         rc = -zap_cursor_retrieve(zc, za);
936         if (rc)
937                 /* Skip empty index object */
938                 GOTO(fini, rc = (rc == -ENOENT ? 1 : rc));
939
940         if (zc->zc_zap->zap_ismicro ||
941             !(zap_f_phys(zc->zc_zap)->zap_flags & ZAP_FLAG_UINT64_KEY))
942                 GOTO(fini, rc = 1);
943
944         le = ZAP_LEAF_ENTRY(zc->zc_leaf, 0);
945         keysize = le->le_name_numints * 8;
946         recsize = za->za_integer_length * za->za_num_integers;
947         if (likely(keysize && recsize))
948                 rc = osd_index_register(osd, fid, keysize, recsize);
949
950         GOTO(fini, rc);
951
952 fini:
953         zap_cursor_fini(zc);
954
955 log:
956         if (dn)
957                 osd_dnode_rele(dn);
958         if (rc < 0)
959                 CWARN("%s: failed to register index "DFID" (%u/%u): rc = %d\n",
960                       osd_name(osd), PFID(fid), keysize, recsize, rc);
961         else if (!rc)
962                 CDEBUG(D_LFSCK, "%s: registered index "DFID" (%u/%u)\n",
963                        osd_name(osd), PFID(fid), keysize, recsize);
964 }
965
966 static void osd_index_restore(const struct lu_env *env, struct osd_device *dev,
967                               struct lustre_index_restore_unit *liru, void *buf,
968                               int bufsize)
969 {
970         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
971         struct lu_fid *tgt_fid = &liru->liru_cfid;
972         struct lu_fid bak_fid;
973         int rc;
974         ENTRY;
975
976         lustre_fid2lbx(buf, tgt_fid, bufsize);
977         rc = -zap_lookup(dev->od_os, dev->od_index_backup_id, buf, 8,
978                          sizeof(*zde) / 8, (void *)zde);
979         if (rc)
980                 GOTO(log, rc);
981
982         rc = osd_get_fid_by_oid(env, dev, zde->lzd_reg.zde_dnode, &bak_fid);
983         if (rc)
984                 GOTO(log, rc);
985
986         /* The OI mapping for index may be invalid, since it will be
987          * re-created, not update the OI mapping, just cache it in RAM. */
988         rc = osd_idc_find_and_init_with_oid(env, dev, tgt_fid,
989                                             liru->liru_clid);
990         if (!rc)
991                 rc = lustre_index_restore(env, &dev->od_dt_dev,
992                                 &liru->liru_pfid, tgt_fid, &bak_fid,
993                                 liru->liru_name, &dev->od_index_backup_list,
994                                 &dev->od_lock, buf, bufsize);
995         GOTO(log, rc);
996
997 log:
998         CDEBUG(D_WARNING, "%s: restore index '%s' with "DFID": rc = %d\n",
999                osd_name(dev), liru->liru_name, PFID(tgt_fid), rc);
1000 }
1001
1002 /**
1003  * verify FID-in-LMA and OI entry for one object
1004  *
1005  * ios: Initial OI Scrub.
1006  */
1007 static int osd_ios_scan_one(const struct lu_env *env, struct osd_device *dev,
1008                             const struct lu_fid *fid, uint64_t parent,
1009                             uint64_t oid, const char *name,
1010                             enum osd_lf_flags flags)
1011 {
1012         struct lustre_scrub *scrub = &dev->od_scrub;
1013         struct scrub_file *sf = &scrub->os_file;
1014         struct lustre_mdt_attrs *lma = NULL;
1015         nvlist_t *nvbuf = NULL;
1016         struct lu_fid tfid;
1017         uint64_t oid2 = 0;
1018         __u64 flag = 0;
1019         int size = 0;
1020         int op = 0;
1021         int rc;
1022         ENTRY;
1023
1024         rc = __osd_xattr_load_by_oid(dev, oid, &nvbuf);
1025         if (unlikely(rc == -ENOENT || rc == -EEXIST))
1026                 RETURN(0);
1027
1028         if (rc && rc != -ENODATA) {
1029                 CWARN("%s: initial OI scrub failed to get lma for %llu: "
1030                       "rc = %d\n", osd_name(dev), oid, rc);
1031
1032                 RETURN(rc);
1033         }
1034
1035         if (!rc) {
1036                 LASSERT(nvbuf != NULL);
1037                 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
1038                                                (uchar_t **)&lma, &size);
1039                 if (rc || size == 0) {
1040                         LASSERT(lma == NULL);
1041                         rc = -ENODATA;
1042                 } else {
1043                         LASSERTF(lma != NULL, "corrupted LMA, size %d\n", size);
1044                         lustre_lma_swab(lma);
1045                         if (lma->lma_compat & LMAC_NOT_IN_OI) {
1046                                 nvlist_free(nvbuf);
1047                                 RETURN(0);
1048                         }
1049
1050                         if (lma->lma_compat & LMAC_IDX_BACKUP &&
1051                             osd_index_need_recreate(env, dev, oid)) {
1052                                 if (parent == dev->od_root) {
1053                                         lu_local_obj_fid(&tfid,
1054                                                          OSD_FS_ROOT_OID);
1055                                 } else {
1056                                         rc = osd_get_fid_by_oid(env, dev,
1057                                                                 parent, &tfid);
1058                                         if (rc) {
1059                                                 nvlist_free(nvbuf);
1060                                                 RETURN(rc);
1061                                         }
1062                                 }
1063
1064                                 rc = lustre_liru_new(
1065                                                 &dev->od_index_restore_list,
1066                                                 &tfid, &lma->lma_self_fid, oid,
1067                                                 name, strlen(name));
1068                                 nvlist_free(nvbuf);
1069                                 RETURN(rc);
1070                         }
1071
1072                         tfid = lma->lma_self_fid;
1073                         if (!(flags & OLF_NOT_BACKUP))
1074                                 osd_ios_index_register(env, dev, &tfid, oid);
1075                 }
1076                 nvlist_free(nvbuf);
1077         }
1078
1079         if (rc == -ENODATA) {
1080                 if (!fid) {
1081                         /* Skip the object without FID-in-LMA */
1082                         CDEBUG(D_LFSCK, "%s: %llu has no FID-in-LMA, skip it\n",
1083                                osd_name(dev), oid);
1084
1085                         RETURN(0);
1086                 }
1087
1088                 LASSERT(!fid_is_zero(fid));
1089
1090                 tfid = *fid;
1091                 if (flags & OLF_IDX_IN_FID) {
1092                         LASSERT(dev->od_index >= 0);
1093
1094                         tfid.f_oid = dev->od_index;
1095                 }
1096         }
1097
1098         rc = osd_fid_lookup(env, dev, &tfid, &oid2);
1099         if (rc) {
1100                 if (rc != -ENOENT) {
1101                         CWARN("%s: initial OI scrub failed to lookup fid for "
1102                               DFID"=>%llu: rc = %d\n",
1103                               osd_name(dev), PFID(&tfid), oid, rc);
1104
1105                         RETURN(rc);
1106                 }
1107
1108                 flag = SF_RECREATED;
1109                 op = DTO_INDEX_INSERT;
1110         } else {
1111                 if (oid == oid2)
1112                         RETURN(0);
1113
1114                 flag = SF_INCONSISTENT;
1115                 op = DTO_INDEX_UPDATE;
1116         }
1117
1118         if (!(sf->sf_flags & flag)) {
1119                 scrub_file_reset(scrub, dev->od_uuid, flag);
1120                 rc = scrub_file_store(env, scrub);
1121                 if (rc)
1122                         RETURN(rc);
1123         }
1124
1125         rc = osd_scrub_refresh_mapping(env, dev, &tfid, oid, op, true, name);
1126
1127         RETURN(rc > 0 ? 0 : rc);
1128 }
1129
1130 static int osd_ios_varfid_hd(const struct lu_env *env, struct osd_device *dev,
1131                              const char *name, uint64_t parent, uint64_t oid,
1132                              enum osd_lf_flags flags, bool is_dir)
1133 {
1134         int rc;
1135         ENTRY;
1136
1137         rc = osd_ios_scan_one(env, dev, NULL, parent, oid, name, 0);
1138         if (!rc && is_dir)
1139                 rc = osd_ios_new_item(dev, oid, flags, osd_ios_general_sd,
1140                                       osd_ios_varfid_hd);
1141
1142         RETURN(rc);
1143 }
1144
1145 static int osd_ios_uld_hd(const struct lu_env *env, struct osd_device *dev,
1146                           const char *name, uint64_t parent, uint64_t oid,
1147                           enum osd_lf_flags flags, bool is_dir)
1148 {
1149         struct lu_fid tfid;
1150         int rc;
1151         ENTRY;
1152
1153         /* skip any non-DFID format name */
1154         if (name[0] != '[')
1155                 RETURN(0);
1156
1157         /* skip the start '[' */
1158         sscanf(&name[1], SFID, RFID(&tfid));
1159         if (fid_is_sane(&tfid))
1160                 rc = osd_ios_scan_one(env, dev, &tfid, parent, oid, name, 0);
1161         else
1162                 rc = -EIO;
1163
1164         RETURN(rc);
1165 }
1166
1167 /*
1168  * General scanner for the directories execpt /ROOT during initial OI scrub.
1169  * It scans the name entries under the given directory one by one. For each
1170  * entry, verifies its OI mapping via the given @handle_dirent.
1171  */
1172 static int osd_ios_general_sd(const struct lu_env *env, struct osd_device *dev,
1173                               uint64_t parent, handle_dirent_t handle_dirent,
1174                               enum osd_lf_flags flags)
1175 {
1176         struct osd_thread_info *info = osd_oti_get(env);
1177         struct luz_direntry *zde = &info->oti_zde;
1178         zap_attribute_t *za = &info->oti_za;
1179         zap_cursor_t *zc = &info->oti_zc;
1180         int rc;
1181         ENTRY;
1182
1183         zap_cursor_init_serialized(zc, dev->od_os, parent, 0);
1184         rc = -zap_cursor_retrieve(zc, za);
1185         if (rc == -ENOENT)
1186                 zap_cursor_advance(zc);
1187         else if (rc)
1188                 GOTO(log, rc);
1189
1190         while (1) {
1191                 rc = -zap_cursor_retrieve(zc, za);
1192                 if (rc)
1193                         GOTO(log, rc = (rc == -ENOENT ? 0 : rc));
1194
1195                 /* skip the entry started with '.' */
1196                 if (likely(za->za_name[0] != '.')) {
1197                         rc = osd_zap_lookup(dev, parent, NULL, za->za_name,
1198                                         za->za_integer_length,
1199                                         sizeof(*zde) / za->za_integer_length,
1200                                         (void *)zde);
1201                         if (rc) {
1202                                 CWARN("%s: initial OI scrub failed to lookup "
1203                                       "%s under %llu: rc = %d\n",
1204                                       osd_name(dev), za->za_name, parent, rc);
1205                                 continue;
1206                         }
1207
1208                         rc = handle_dirent(env, dev, za->za_name, parent,
1209                                         zde->lzd_reg.zde_dnode, flags,
1210                                         S_ISDIR(DTTOIF(zde->lzd_reg.zde_type)) ?
1211                                         true : false);
1212                         CDEBUG(D_LFSCK, "%s: initial OI scrub handled %s under "
1213                                "%llu: rc = %d\n",
1214                                osd_name(dev), za->za_name, parent, rc);
1215                 }
1216
1217                 zap_cursor_advance(zc);
1218         }
1219
1220 log:
1221         if (rc)
1222                 CWARN("%s: initial OI scrub failed to scan the directory %llu: "
1223                       "rc = %d\n", osd_name(dev), parent, rc);
1224         zap_cursor_fini(zc);
1225
1226         return rc;
1227 }
1228
1229 /*
1230  * The scanner for /ROOT directory. It is not all the items under /ROOT will
1231  * be scanned during the initial OI scrub, instead, only the .lustre and the
1232  * sub-items under .lustre will be handled.
1233  */
1234 static int osd_ios_ROOT_sd(const struct lu_env *env, struct osd_device *dev,
1235                            uint64_t parent, handle_dirent_t handle_dirent,
1236                            enum osd_lf_flags flags)
1237 {
1238         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
1239         const struct osd_lf_map *map;
1240         uint64_t oid;
1241         int rc;
1242         int rc1 = 0;
1243         ENTRY;
1244
1245         rc = osd_zap_lookup(dev, parent, NULL, dot_lustre_name, 8,
1246                             sizeof(*zde) / 8, (void *)zde);
1247         if (rc == -ENOENT) {
1248                 /* The .lustre directory is lost. That is not fatal. It can
1249                  * be re-created in the subsequent MDT start processing. */
1250                 RETURN(0);
1251         }
1252
1253         if (rc) {
1254                 CWARN("%s: initial OI scrub failed to find .lustre: "
1255                       "rc = %d\n", osd_name(dev), rc);
1256
1257                 RETURN(rc);
1258         }
1259
1260         oid = zde->lzd_reg.zde_dnode;
1261         rc = osd_ios_scan_one(env, dev, &LU_DOT_LUSTRE_FID, parent, oid,
1262                               dot_lustre_name, 0);
1263         if (rc)
1264                 RETURN(rc);
1265
1266         for (map = osd_dl_maps; map->olm_name; map++) {
1267                 rc = osd_zap_lookup(dev, oid, NULL, map->olm_name, 8,
1268                                     sizeof(*zde) / 8, (void *)zde);
1269                 if (rc) {
1270                         if (rc != -ENOENT)
1271                                 CWARN("%s: initial OI scrub failed to find the entry %s under .lustre: rc = %d\n",
1272                                       osd_name(dev), map->olm_name, rc);
1273                         else if (!fid_is_zero(&map->olm_fid))
1274                                 /* Try to remove the stale OI mapping. */
1275                                 osd_scrub_refresh_mapping(env, dev,
1276                                                 &map->olm_fid, 0,
1277                                                 DTO_INDEX_DELETE, true,
1278                                                 map->olm_name);
1279                         continue;
1280                 }
1281
1282                 rc = osd_ios_scan_one(env, dev, &map->olm_fid, oid,
1283                                       zde->lzd_reg.zde_dnode, map->olm_name,
1284                                       map->olm_flags);
1285                 if (rc)
1286                         rc1 = rc;
1287         }
1288
1289         RETURN(rc1);
1290 }
1291
1292 static void osd_initial_OI_scrub(const struct lu_env *env,
1293                                  struct osd_device *dev)
1294 {
1295         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
1296         const struct osd_lf_map *map;
1297         int rc;
1298         ENTRY;
1299
1300         for (map = osd_lf_maps; map->olm_name; map++) {
1301                 rc = osd_zap_lookup(dev, dev->od_root, NULL, map->olm_name, 8,
1302                                     sizeof(*zde) / 8, (void *)zde);
1303                 if (rc) {
1304                         if (rc != -ENOENT)
1305                                 CWARN("%s: initial OI scrub failed "
1306                                       "to find the entry %s: rc = %d\n",
1307                                       osd_name(dev), map->olm_name, rc);
1308                         else if (!fid_is_zero(&map->olm_fid))
1309                                 /* Try to remove the stale OI mapping. */
1310                                 osd_scrub_refresh_mapping(env, dev,
1311                                                 &map->olm_fid, 0,
1312                                                 DTO_INDEX_DELETE, true,
1313                                                 map->olm_name);
1314                         continue;
1315                 }
1316
1317                 rc = osd_ios_scan_one(env, dev, &map->olm_fid, dev->od_root,
1318                                       zde->lzd_reg.zde_dnode, map->olm_name,
1319                                       map->olm_flags);
1320                 if (!rc && map->olm_flags & OLF_SCAN_SUBITEMS)
1321                         osd_ios_new_item(dev, zde->lzd_reg.zde_dnode,
1322                                          map->olm_flags, map->olm_scan_dir,
1323                                          map->olm_handle_dirent);
1324         }
1325
1326         while (!list_empty(&dev->od_ios_list)) {
1327                 struct osd_ios_item *item;
1328
1329                 item = list_entry(dev->od_ios_list.next,
1330                                   struct osd_ios_item, oii_list);
1331                 list_del_init(&item->oii_list);
1332                 item->oii_scan_dir(env, dev, item->oii_parent,
1333                                    item->oii_handle_dirent, item->oii_flags);
1334                 OBD_FREE_PTR(item);
1335         }
1336
1337         if (!list_empty(&dev->od_index_restore_list)) {
1338                 char *buf;
1339
1340                 OBD_ALLOC_LARGE(buf, INDEX_BACKUP_BUFSIZE);
1341                 if (!buf)
1342                         CERROR("%s: not enough RAM for rebuild index\n",
1343                                osd_name(dev));
1344
1345                 while (!list_empty(&dev->od_index_restore_list)) {
1346                         struct lustre_index_restore_unit *liru;
1347
1348                         liru = list_entry(dev->od_index_restore_list.next,
1349                                           struct lustre_index_restore_unit,
1350                                           liru_link);
1351                         list_del(&liru->liru_link);
1352                         if (buf)
1353                                 osd_index_restore(env, dev, liru, buf,
1354                                                   INDEX_BACKUP_BUFSIZE);
1355                         OBD_FREE(liru, liru->liru_len);
1356                 }
1357
1358                 if (buf)
1359                         OBD_FREE_LARGE(buf, INDEX_BACKUP_BUFSIZE);
1360         }
1361
1362         EXIT;
1363 }
1364
1365 /* OI scrub start/stop */
1366
1367 int osd_scrub_start(const struct lu_env *env, struct osd_device *dev,
1368                     __u32 flags)
1369 {
1370         int rc;
1371         ENTRY;
1372
1373         if (dev->od_dt_dev.dd_rdonly)
1374                 RETURN(-EROFS);
1375
1376         /* od_otable_sem: prevent concurrent start/stop */
1377         down(&dev->od_otable_sem);
1378         rc = scrub_start(osd_scrub_main, &dev->od_scrub, dev, flags);
1379         up(&dev->od_otable_sem);
1380
1381         RETURN(rc == -EALREADY ? 0 : rc);
1382 }
1383
1384 void osd_scrub_stop(struct osd_device *dev)
1385 {
1386         struct lustre_scrub *scrub = &dev->od_scrub;
1387         ENTRY;
1388
1389         /* od_otable_sem: prevent concurrent start/stop */
1390         down(&dev->od_otable_sem);
1391         scrub->os_paused = 1;
1392         scrub_stop(scrub);
1393         up(&dev->od_otable_sem);
1394
1395         EXIT;
1396 }
1397
1398 /* OI scrub setup/cleanup */
1399
1400 static const char osd_scrub_name[] = "OI_scrub";
1401
1402 int osd_scrub_setup(const struct lu_env *env, struct osd_device *dev)
1403 {
1404         struct osd_thread_info *info = osd_oti_get(env);
1405         struct lustre_scrub *scrub = &dev->od_scrub;
1406         struct scrub_file *sf = &scrub->os_file;
1407         struct lu_fid *fid = &info->oti_fid;
1408         struct dt_object *obj;
1409         uint64_t oid;
1410         int rc = 0;
1411         bool dirty = false;
1412         ENTRY;
1413
1414         memcpy(dev->od_uuid.b,
1415                &dsl_dataset_phys(dev->od_os->os_dsl_dataset)->ds_guid,
1416                sizeof(dsl_dataset_phys(dev->od_os->os_dsl_dataset)->ds_guid));
1417         memset(&dev->od_scrub, 0, sizeof(struct lustre_scrub));
1418         init_waitqueue_head(&scrub->os_thread.t_ctl_waitq);
1419         init_rwsem(&scrub->os_rwsem);
1420         spin_lock_init(&scrub->os_lock);
1421         INIT_LIST_HEAD(&scrub->os_inconsistent_items);
1422         scrub->os_name = osd_name(dev);
1423
1424         /* 'What the @fid is' is not imporatant, because the object
1425          * has no OI mapping, and only is visible inside the OSD.*/
1426         fid->f_seq = FID_SEQ_IGIF_MAX;
1427         if (dev->od_is_ost)
1428                 fid->f_oid = ((1 << 31) | dev->od_index) + 1;
1429         else
1430                 fid->f_oid = dev->od_index + 1;
1431         fid->f_ver = 0;
1432         rc = osd_obj_find_or_create(env, dev, dev->od_root,
1433                                     osd_scrub_name, &oid, fid, false);
1434         if (rc)
1435                 RETURN(rc);
1436
1437         rc = osd_idc_find_and_init_with_oid(env, dev, fid, oid);
1438         if (rc)
1439                 RETURN(rc);
1440
1441         obj = lu2dt(lu_object_find_slice(env, osd2lu_dev(dev), fid, NULL));
1442         if (IS_ERR_OR_NULL(obj))
1443                 RETURN(obj ? PTR_ERR(obj) : -ENOENT);
1444
1445         obj->do_body_ops = &osd_body_scrub_ops;
1446         scrub->os_obj = obj;
1447         rc = scrub_file_load(env, scrub);
1448         if (rc == -ENOENT || rc == -EFAULT) {
1449                 scrub_file_init(scrub, dev->od_uuid);
1450                 dirty = true;
1451         } else if (rc < 0) {
1452                 GOTO(cleanup_obj, rc);
1453         } else {
1454                 if (!uuid_equal(&sf->sf_uuid, &dev->od_uuid)) {
1455                         CDEBUG(D_LFSCK,
1456                                "%s: UUID has been changed from %pU to %pU\n",
1457                                osd_name(dev), &sf->sf_uuid, &dev->od_uuid);
1458                         scrub_file_reset(scrub, dev->od_uuid, SF_INCONSISTENT);
1459                         dirty = true;
1460                 } else if (sf->sf_status == SS_SCANNING) {
1461                         sf->sf_status = SS_CRASHED;
1462                         dirty = true;
1463                 }
1464
1465                 if ((sf->sf_oi_count & (sf->sf_oi_count - 1)) != 0) {
1466                         LCONSOLE_WARN("%s: invalid oi count %d, set it to %d\n",
1467                                       osd_name(dev), sf->sf_oi_count,
1468                                       osd_oi_count);
1469                         sf->sf_oi_count = osd_oi_count;
1470                         dirty = true;
1471                 }
1472         }
1473
1474         if (sf->sf_pos_last_checkpoint != 0)
1475                 scrub->os_pos_current = sf->sf_pos_last_checkpoint + 1;
1476         else
1477                 scrub->os_pos_current = 1;
1478
1479         if (dirty) {
1480                 rc = scrub_file_store(env, scrub);
1481                 if (rc)
1482                         GOTO(cleanup_obj, rc);
1483         }
1484
1485         /* Initialize OI files. */
1486         rc = osd_oi_init(env, dev);
1487         if (rc < 0)
1488                 GOTO(cleanup_obj, rc);
1489
1490         if (!dev->od_dt_dev.dd_rdonly)
1491                 osd_initial_OI_scrub(env, dev);
1492
1493         if (!dev->od_dt_dev.dd_rdonly &&
1494             dev->od_auto_scrub_interval != AS_NEVER &&
1495             ((sf->sf_status == SS_PAUSED) ||
1496              (sf->sf_status == SS_CRASHED &&
1497               sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
1498                               SF_UPGRADE | SF_AUTO)) ||
1499              (sf->sf_status == SS_INIT &&
1500               sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
1501                               SF_UPGRADE))))
1502                 rc = osd_scrub_start(env, dev, SS_AUTO_FULL);
1503
1504         if (rc)
1505                 GOTO(cleanup_oi, rc);
1506
1507         RETURN(0);
1508
1509 cleanup_oi:
1510         osd_oi_fini(env, dev);
1511 cleanup_obj:
1512         dt_object_put_nocache(env, scrub->os_obj);
1513         scrub->os_obj = NULL;
1514
1515         return rc;
1516 }
1517
1518 void osd_scrub_cleanup(const struct lu_env *env, struct osd_device *dev)
1519 {
1520         struct lustre_scrub *scrub = &dev->od_scrub;
1521
1522         LASSERT(!dev->od_otable_it);
1523
1524         if (scrub->os_obj) {
1525                 osd_scrub_stop(dev);
1526                 dt_object_put_nocache(env, scrub->os_obj);
1527                 scrub->os_obj = NULL;
1528         }
1529
1530         if (dev->od_oi_table)
1531                 osd_oi_fini(env, dev);
1532 }
1533
1534 /* object table based iteration APIs */
1535
1536 static struct dt_it *osd_otable_it_init(const struct lu_env *env,
1537                                        struct dt_object *dt, __u32 attr)
1538 {
1539         enum dt_otable_it_flags flags = attr >> DT_OTABLE_IT_FLAGS_SHIFT;
1540         enum dt_otable_it_valid valid = attr & ~DT_OTABLE_IT_FLAGS_MASK;
1541         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
1542         struct lustre_scrub *scrub = &dev->od_scrub;
1543         struct osd_otable_it *it;
1544         __u32 start = 0;
1545         int rc;
1546         ENTRY;
1547
1548         if (dev->od_dt_dev.dd_rdonly)
1549                 RETURN(ERR_PTR(-EROFS));
1550
1551         /* od_otable_sem: prevent concurrent init/fini */
1552         down(&dev->od_otable_sem);
1553         if (dev->od_otable_it)
1554                 GOTO(out, it = ERR_PTR(-EALREADY));
1555
1556         OBD_ALLOC_PTR(it);
1557         if (!it)
1558                 GOTO(out, it = ERR_PTR(-ENOMEM));
1559
1560         if (flags & DOIF_OUTUSED)
1561                 it->ooi_used_outside = 1;
1562
1563         if (flags & DOIF_RESET)
1564                 start |= SS_RESET;
1565
1566         if (valid & DOIV_ERROR_HANDLE) {
1567                 if (flags & DOIF_FAILOUT)
1568                         start |= SS_SET_FAILOUT;
1569                 else
1570                         start |= SS_CLEAR_FAILOUT;
1571         }
1572
1573         if (valid & DOIV_DRYRUN) {
1574                 if (flags & DOIF_DRYRUN)
1575                         start |= SS_SET_DRYRUN;
1576                 else
1577                         start |= SS_CLEAR_DRYRUN;
1578         }
1579
1580         /* XXX: dmu_object_next() does NOT find dnodes allocated
1581          *      in the current non-committed txg, so we force txg
1582          *      commit to find all existing dnodes ... */
1583         txg_wait_synced(dmu_objset_pool(dev->od_os), 0ULL);
1584
1585         dev->od_otable_it = it;
1586         it->ooi_dev = dev;
1587         rc = scrub_start(osd_scrub_main, scrub, dev, start & ~SS_AUTO_PARTIAL);
1588         if (rc == -EALREADY) {
1589                 it->ooi_pos = 1;
1590         } else if (rc < 0) {
1591                 dev->od_otable_it = NULL;
1592                 OBD_FREE_PTR(it);
1593                 it = ERR_PTR(rc);
1594         } else {
1595                 it->ooi_pos = scrub->os_pos_current;
1596         }
1597
1598         GOTO(out, it);
1599
1600 out:
1601         up(&dev->od_otable_sem);
1602         return (struct dt_it *)it;
1603 }
1604
1605 static void osd_otable_it_fini(const struct lu_env *env, struct dt_it *di)
1606 {
1607         struct osd_otable_it *it = (struct osd_otable_it *)di;
1608         struct osd_device *dev = it->ooi_dev;
1609
1610         /* od_otable_sem: prevent concurrent init/fini */
1611         down(&dev->od_otable_sem);
1612         scrub_stop(&dev->od_scrub);
1613         LASSERT(dev->od_otable_it == it);
1614
1615         dev->od_otable_it = NULL;
1616         up(&dev->od_otable_sem);
1617         OBD_FREE_PTR(it);
1618 }
1619
1620 static int osd_otable_it_get(const struct lu_env *env,
1621                              struct dt_it *di, const struct dt_key *key)
1622 {
1623         return 0;
1624 }
1625
1626 static void osd_otable_it_put(const struct lu_env *env, struct dt_it *di)
1627 {
1628 }
1629
1630 static void osd_otable_it_preload(const struct lu_env *env,
1631                                   struct osd_otable_it *it)
1632 {
1633         struct osd_device *dev = it->ooi_dev;
1634         int rc;
1635
1636         /* can go negative on the very first access to the iterator
1637          * or if some non-Lustre objects were found */
1638         if (unlikely(it->ooi_prefetched < 0))
1639                 it->ooi_prefetched = 0;
1640
1641         if (it->ooi_prefetched >= (OTABLE_PREFETCH >> 1))
1642                 return;
1643
1644         if (it->ooi_prefetched_dnode == 0)
1645                 it->ooi_prefetched_dnode = it->ooi_pos;
1646
1647         while (it->ooi_prefetched < OTABLE_PREFETCH) {
1648                 rc = -dmu_object_next(dev->od_os, &it->ooi_prefetched_dnode,
1649                                       B_FALSE, 0);
1650                 if (rc)
1651                         break;
1652
1653                 osd_dmu_prefetch(dev->od_os, it->ooi_prefetched_dnode,
1654                                  0, 0, 0, ZIO_PRIORITY_ASYNC_READ);
1655                 it->ooi_prefetched++;
1656         }
1657 }
1658
1659 static inline int
1660 osd_otable_it_wakeup(struct lustre_scrub *scrub, struct osd_otable_it *it)
1661 {
1662         spin_lock(&scrub->os_lock);
1663         if (it->ooi_pos < scrub->os_pos_current || scrub->os_waiting ||
1664             !thread_is_running(&scrub->os_thread))
1665                 it->ooi_waiting = 0;
1666         else
1667                 it->ooi_waiting = 1;
1668         spin_unlock(&scrub->os_lock);
1669
1670         return !it->ooi_waiting;
1671 }
1672
1673 static int osd_otable_it_next(const struct lu_env *env, struct dt_it *di)
1674 {
1675         struct osd_otable_it *it = (struct osd_otable_it *)di;
1676         struct osd_device *dev = it->ooi_dev;
1677         struct lustre_scrub *scrub = &dev->od_scrub;
1678         struct ptlrpc_thread *thread = &scrub->os_thread;
1679         struct lustre_mdt_attrs *lma = NULL;
1680         nvlist_t *nvbuf = NULL;
1681         int rc, size = 0;
1682         bool locked;
1683         ENTRY;
1684
1685         LASSERT(it->ooi_user_ready);
1686         fid_zero(&it->ooi_fid);
1687
1688         if (unlikely(it->ooi_all_cached))
1689                 RETURN(1);
1690
1691 again:
1692         if (nvbuf) {
1693                 nvlist_free(nvbuf);
1694                 nvbuf = NULL;
1695                 lma = NULL;
1696                 size = 0;
1697         }
1698
1699         if (it->ooi_pos >= scrub->os_pos_current)
1700                 wait_event_idle(thread->t_ctl_waitq,
1701                                 osd_otable_it_wakeup(scrub, it));
1702
1703         if (!thread_is_running(thread) && !it->ooi_used_outside)
1704                 GOTO(out, rc = 1);
1705
1706         rc = -dmu_object_next(dev->od_os, &it->ooi_pos, B_FALSE, 0);
1707         if (rc) {
1708                 if (unlikely(rc == -ESRCH)) {
1709                         it->ooi_all_cached = 1;
1710                         rc = 1;
1711                 }
1712
1713                 GOTO(out, rc);
1714         }
1715
1716         rc = __osd_xattr_load_by_oid(dev, it->ooi_pos, &nvbuf);
1717
1718         locked = false;
1719         if (!scrub->os_full_speed) {
1720                 spin_lock(&scrub->os_lock);
1721                 locked = true;
1722         }
1723         it->ooi_prefetched--;
1724         if (!scrub->os_full_speed) {
1725                 if (scrub->os_waiting) {
1726                         scrub->os_waiting = 0;
1727                         wake_up_all(&thread->t_ctl_waitq);
1728                 }
1729         }
1730         if (locked)
1731                 spin_unlock(&scrub->os_lock);
1732
1733         if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
1734                 goto again;
1735
1736         if (rc)
1737                 GOTO(out, rc);
1738
1739         LASSERT(nvbuf != NULL);
1740         rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
1741                                        (uchar_t **)&lma, &size);
1742         if (rc || size == 0)
1743                 /* It is either non-Lustre object or OSD internal object,
1744                  * ignore it, go ahead */
1745                 goto again;
1746
1747         LASSERTF(lma != NULL, "corrupted LMA, size %d\n", size);
1748         lustre_lma_swab(lma);
1749         if (unlikely(lma->lma_compat & LMAC_NOT_IN_OI ||
1750                      lma->lma_incompat & LMAI_AGENT))
1751                 goto again;
1752
1753         it->ooi_fid = lma->lma_self_fid;
1754
1755         GOTO(out, rc = 0);
1756
1757 out:
1758         if (nvbuf)
1759                 nvlist_free(nvbuf);
1760
1761         if (!rc && scrub->os_full_speed)
1762                 osd_otable_it_preload(env, it);
1763
1764         return rc;
1765 }
1766
1767 static struct dt_key *osd_otable_it_key(const struct lu_env *env,
1768                                         const struct dt_it *di)
1769 {
1770         return NULL;
1771 }
1772
1773 static int osd_otable_it_key_size(const struct lu_env *env,
1774                                   const struct dt_it *di)
1775 {
1776         return sizeof(__u64);
1777 }
1778
1779 static int osd_otable_it_rec(const struct lu_env *env, const struct dt_it *di,
1780                              struct dt_rec *rec, __u32 attr)
1781 {
1782         struct osd_otable_it *it  = (struct osd_otable_it *)di;
1783         struct lu_fid *fid = (struct lu_fid *)rec;
1784
1785         *fid = it->ooi_fid;
1786         return 0;
1787 }
1788
1789 static __u64 osd_otable_it_store(const struct lu_env *env,
1790                                  const struct dt_it *di)
1791 {
1792         struct osd_otable_it *it = (struct osd_otable_it *)di;
1793
1794         return it->ooi_pos;
1795 }
1796
1797 /**
1798  * Set the OSD layer iteration start position as the specified hash.
1799  */
1800 static int osd_otable_it_load(const struct lu_env *env,
1801                               const struct dt_it *di, __u64 hash)
1802 {
1803         struct osd_otable_it *it = (struct osd_otable_it *)di;
1804         struct osd_device *dev = it->ooi_dev;
1805         struct lustre_scrub *scrub = &dev->od_scrub;
1806         int rc;
1807         ENTRY;
1808
1809         /* Forbid to set iteration position after iteration started. */
1810         if (it->ooi_user_ready)
1811                 RETURN(-EPERM);
1812
1813         if (hash > OSD_OTABLE_MAX_HASH)
1814                 hash = OSD_OTABLE_MAX_HASH;
1815
1816         /* The hash is the last checkpoint position,
1817          * we will start from the next one. */
1818         it->ooi_pos = hash + 1;
1819         it->ooi_prefetched = 0;
1820         it->ooi_prefetched_dnode = 0;
1821         it->ooi_user_ready = 1;
1822         if (!scrub->os_full_speed)
1823                 wake_up_all(&scrub->os_thread.t_ctl_waitq);
1824
1825         /* Unplug OSD layer iteration by the first next() call. */
1826         rc = osd_otable_it_next(env, (struct dt_it *)it);
1827
1828         RETURN(rc);
1829 }
1830
1831 static int osd_otable_it_key_rec(const struct lu_env *env,
1832                                  const struct dt_it *di, void *key_rec)
1833 {
1834         return 0;
1835 }
1836
1837 const struct dt_index_operations osd_otable_ops = {
1838         .dio_it = {
1839                 .init     = osd_otable_it_init,
1840                 .fini     = osd_otable_it_fini,
1841                 .get      = osd_otable_it_get,
1842                 .put      = osd_otable_it_put,
1843                 .next     = osd_otable_it_next,
1844                 .key      = osd_otable_it_key,
1845                 .key_size = osd_otable_it_key_size,
1846                 .rec      = osd_otable_it_rec,
1847                 .store    = osd_otable_it_store,
1848                 .load     = osd_otable_it_load,
1849                 .key_rec  = osd_otable_it_key_rec,
1850         }
1851 };
1852
1853 /* high priority inconsistent items list APIs */
1854
1855 int osd_oii_insert(const struct lu_env *env, struct osd_device *dev,
1856                    const struct lu_fid *fid, uint64_t oid, bool insert)
1857 {
1858         struct lustre_scrub *scrub = &dev->od_scrub;
1859         struct ptlrpc_thread *thread = &scrub->os_thread;
1860         struct osd_inconsistent_item *oii;
1861         bool wakeup = false;
1862         ENTRY;
1863
1864         osd_idc_find_and_init_with_oid(env, dev, fid, oid);
1865         OBD_ALLOC_PTR(oii);
1866         if (unlikely(!oii))
1867                 RETURN(-ENOMEM);
1868
1869         INIT_LIST_HEAD(&oii->oii_list);
1870         oii->oii_cache.oic_dev = dev;
1871         oii->oii_cache.oic_fid = *fid;
1872         oii->oii_cache.oic_dnode = oid;
1873         oii->oii_insert = insert;
1874
1875         spin_lock(&scrub->os_lock);
1876         if (unlikely(!thread_is_running(thread))) {
1877                 spin_unlock(&scrub->os_lock);
1878                 OBD_FREE_PTR(oii);
1879                 RETURN(-EAGAIN);
1880         }
1881
1882         if (list_empty(&scrub->os_inconsistent_items))
1883                 wakeup = true;
1884         list_add_tail(&oii->oii_list, &scrub->os_inconsistent_items);
1885         spin_unlock(&scrub->os_lock);
1886
1887         if (wakeup)
1888                 wake_up_all(&thread->t_ctl_waitq);
1889
1890         RETURN(0);
1891 }
1892
1893 int osd_oii_lookup(struct osd_device *dev, const struct lu_fid *fid,
1894                    uint64_t *oid)
1895 {
1896         struct lustre_scrub *scrub = &dev->od_scrub;
1897         struct osd_inconsistent_item *oii;
1898         int ret = -ENOENT;
1899         ENTRY;
1900
1901         spin_lock(&scrub->os_lock);
1902         list_for_each_entry(oii, &scrub->os_inconsistent_items, oii_list) {
1903                 if (lu_fid_eq(fid, &oii->oii_cache.oic_fid)) {
1904                         *oid = oii->oii_cache.oic_dnode;
1905                         ret = 0;
1906                         break;
1907                 }
1908         }
1909         spin_unlock(&scrub->os_lock);
1910
1911         RETURN(ret);
1912 }