Whamcloud - gitweb
247973afbe367f74f9b475ee533107a706d01a0a
[fs/lustre-release.git] / lustre / osd-zfs / osd_scrub.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, Intel Corporation.
24  */
25 /*
26  * lustre/osd-zfs/osd_scrub.c
27  *
28  * Top-level entry points into osd module
29  *
30  * The OI scrub is used for rebuilding Object Index files when restores MDT from
31  * file-level backup.
32  *
33  * The otable based iterator scans ZFS objects to feed up layer LFSCK.
34  *
35  * Author: Fan Yong <fan.yong@intel.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_LFSCK
39
40 #include <linux/kthread.h>
41 #include <uapi/linux/lustre/lustre_idl.h>
42 #include <lustre_disk.h>
43 #include <dt_object.h>
44 #include <linux/xattr.h>
45 #include <lustre_scrub.h>
46 #include <obd_class.h>
47 #include <lustre_nodemap.h>
48 #include <sys/dsl_dataset.h>
49 #include <sys/zap_impl.h>
50 #include <sys/zap.h>
51 #include <sys/zap_leaf.h>
52
53 #include "osd_internal.h"
54
55 #define OSD_OTABLE_MAX_HASH             ((1ULL << 48) - 1)
56 #define OTABLE_PREFETCH                 256
57
58 static inline bool osd_scrub_has_window(struct osd_otable_it *it)
59 {
60         return it->ooi_prefetched < OTABLE_PREFETCH;
61 }
62
63 /**
64  * update/insert/delete the specified OI mapping (@fid @id) according to the ops
65  *
66  * \retval   1, changed nothing
67  * \retval   0, changed successfully
68  * \retval -ve, on error
69  */
70 int osd_scrub_refresh_mapping(const struct lu_env *env,
71                               struct osd_device *dev,
72                               const struct lu_fid *fid,
73                               uint64_t oid, enum dt_txn_op ops,
74                               bool force, const char *name)
75 {
76         struct osd_thread_info *info = osd_oti_get(env);
77         struct zpl_direntry *zde = &info->oti_zde.lzd_reg;
78         char *buf = info->oti_str;
79         dmu_tx_t *tx = NULL;
80         dnode_t *dn = NULL;
81         uint64_t zapid;
82         int rc;
83         ENTRY;
84
85         if (dev->od_scrub.os_file.sf_param & SP_DRYRUN && !force)
86                 GOTO(log, rc = 0);
87
88         tx = dmu_tx_create(dev->od_os);
89         if (!tx)
90                 GOTO(log, rc = -ENOMEM);
91
92         zapid = osd_get_name_n_idx(env, dev, fid, buf,
93                                    sizeof(info->oti_str), &dn);
94         osd_tx_hold_zap(tx, zapid, dn,
95                         ops == DTO_INDEX_INSERT ? TRUE : FALSE, NULL);
96         rc = -dmu_tx_assign(tx, TXG_WAIT);
97         if (rc) {
98                 dmu_tx_abort(tx);
99                 GOTO(log, rc);
100         }
101
102         switch (ops) {
103         case DTO_INDEX_UPDATE:
104                 zde->zde_pad = 0;
105                 zde->zde_dnode = oid;
106                 zde->zde_type = 0; /* The type in OI mapping is useless. */
107                 rc = -zap_update(dev->od_os, zapid, buf, 8, sizeof(*zde) / 8,
108                                  zde, tx);
109                 if (unlikely(rc == -ENOENT)) {
110                         /* Some unlink thread may removed the OI mapping. */
111                         rc = 1;
112                 }
113                 break;
114         case DTO_INDEX_INSERT:
115                 zde->zde_pad = 0;
116                 zde->zde_dnode = oid;
117                 zde->zde_type = 0; /* The type in OI mapping is useless. */
118                 rc = osd_zap_add(dev, zapid, dn, buf, 8, sizeof(*zde) / 8,
119                                  zde, tx);
120                 if (unlikely(rc == -EEXIST))
121                         rc = 1;
122                 break;
123         case DTO_INDEX_DELETE:
124                 rc = osd_zap_remove(dev, zapid, dn, buf, tx);
125                 if (rc == -ENOENT) {
126                         /* It is normal that the unlink thread has removed the
127                          * OI mapping already. */
128                         rc = 1;
129                 }
130                 break;
131         default:
132                 LASSERTF(0, "Unexpected ops %d\n", ops);
133                 rc = -EINVAL;
134                 break;
135         }
136
137         dmu_tx_commit(tx);
138         GOTO(log, rc);
139
140 log:
141         CDEBUG(D_LFSCK, "%s: refresh OI map for scrub, op %d, force %s, "
142                DFID" => %llu (%s): rc = %d\n", osd_name(dev), ops,
143                force ? "yes" : "no", PFID(fid), oid, name ? name : "null", rc);
144
145         return rc;
146 }
147
148 static int
149 osd_scrub_check_update(const struct lu_env *env, struct osd_device *dev,
150                        const struct lu_fid *fid, uint64_t oid, int val)
151 {
152         struct lustre_scrub *scrub = &dev->od_scrub;
153         struct scrub_file *sf = &scrub->os_file;
154         struct osd_inconsistent_item *oii = NULL;
155         nvlist_t *nvbuf = NULL;
156         dnode_t *dn = NULL;
157         uint64_t oid2;
158         int ops = DTO_INDEX_UPDATE;
159         int rc;
160         ENTRY;
161
162         down_write(&scrub->os_rwsem);
163         scrub->os_new_checked++;
164         if (val < 0)
165                 GOTO(out, rc = val);
166
167         if (scrub->os_in_prior)
168                 oii = list_entry(scrub->os_inconsistent_items.next,
169                                  struct osd_inconsistent_item, oii_list);
170
171         if (oid < sf->sf_pos_latest_start && !oii)
172                 GOTO(out, rc = 0);
173
174         if (oii && oii->oii_insert) {
175                 ops = DTO_INDEX_INSERT;
176                 goto zget;
177         }
178
179         rc = osd_fid_lookup(env, dev, fid, &oid2);
180         if (rc) {
181                 if (rc != -ENOENT)
182                         GOTO(out, rc);
183
184                 ops = DTO_INDEX_INSERT;
185
186 zget:
187                 rc = __osd_obj2dnode(dev->od_os, oid, &dn);
188                 if (rc) {
189                         /* Someone removed the object by race. */
190                         if (rc == -ENOENT || rc == -EEXIST)
191                                 rc = 0;
192                         GOTO(out, rc);
193                 }
194
195                 spin_lock(&scrub->os_lock);
196                 scrub->os_full_speed = 1;
197                 spin_unlock(&scrub->os_lock);
198
199                 sf->sf_flags |= SF_INCONSISTENT;
200         } else if (oid == oid2) {
201                 GOTO(out, rc = 0);
202         } else {
203                 struct lustre_mdt_attrs *lma = NULL;
204                 int size;
205
206                 rc = __osd_xattr_load_by_oid(dev, oid2, &nvbuf);
207                 if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
208                         goto update;
209                 if (rc)
210                         GOTO(out, rc);
211
212                 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
213                                                (uchar_t **)&lma, &size);
214                 if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
215                         goto update;
216                 if (rc)
217                         GOTO(out, rc);
218
219                 lustre_lma_swab(lma);
220                 if (unlikely(lu_fid_eq(&lma->lma_self_fid, fid))) {
221                         CDEBUG(D_LFSCK, "%s: the FID "DFID" is used by "
222                                "two objects: %llu and %llu (in OI)\n",
223                                osd_name(dev), PFID(fid), oid, oid2);
224
225                         GOTO(out, rc = -EEXIST);
226                 }
227
228 update:
229                 spin_lock(&scrub->os_lock);
230                 scrub->os_full_speed = 1;
231                 spin_unlock(&scrub->os_lock);
232                 sf->sf_flags |= SF_INCONSISTENT;
233         }
234
235         rc = osd_scrub_refresh_mapping(env, dev, fid, oid, ops, false, NULL);
236         if (!rc) {
237                 if (scrub->os_in_prior)
238                         sf->sf_items_updated_prior++;
239                 else
240                         sf->sf_items_updated++;
241         }
242
243         GOTO(out, rc);
244
245 out:
246         if (dev->od_is_ost) {
247                 sa_handle_t *hdl;
248                 uint64_t nlink, mode;
249
250                 rc = -sa_handle_get(dev->od_os, oid, NULL, SA_HDL_PRIVATE,
251                                     &hdl);
252                 if (rc)
253                         GOTO(cleanup, rc);
254
255                 rc = -sa_lookup(hdl, SA_ZPL_MODE(dev), &mode, sizeof(mode));
256                 if (rc || !S_ISREG(mode)) {
257                         sa_handle_destroy(hdl);
258                         GOTO(cleanup, rc);
259                 }
260
261                 rc = -sa_lookup(hdl, SA_ZPL_LINKS(dev), &nlink, sizeof(nlink));
262                 if (rc == 0 && nlink > 1)
263                         scrub->os_has_ml_file = 1;
264
265                 sa_handle_destroy(hdl);
266         }
267
268 cleanup:
269         if (nvbuf)
270                 nvlist_free(nvbuf);
271
272         if (rc < 0) {
273                 sf->sf_items_failed++;
274                 if (sf->sf_pos_first_inconsistent == 0 ||
275                     sf->sf_pos_first_inconsistent > oid)
276                         sf->sf_pos_first_inconsistent = oid;
277         } else {
278                 rc = 0;
279         }
280
281         /* There may be conflict unlink during the OI scrub,
282          * if happend, then remove the new added OI mapping. */
283         if (ops == DTO_INDEX_INSERT && dn && dn->dn_free_txg)
284                 osd_scrub_refresh_mapping(env, dev, fid, oid,
285                                           DTO_INDEX_DELETE, false, NULL);
286         up_write(&scrub->os_rwsem);
287
288         if (dn)
289                 osd_dnode_rele(dn);
290
291         if (oii) {
292                 spin_lock(&scrub->os_lock);
293                 if (likely(!list_empty(&oii->oii_list)))
294                         list_del(&oii->oii_list);
295                 spin_unlock(&scrub->os_lock);
296                 OBD_FREE_PTR(oii);
297         }
298
299         RETURN(sf->sf_param & SP_FAILOUT ? rc : 0);
300 }
301
302 static int osd_scrub_prep(const struct lu_env *env, struct osd_device *dev)
303 {
304         struct lustre_scrub *scrub = &dev->od_scrub;
305         struct scrub_file *sf = &scrub->os_file;
306         __u32 flags = scrub->os_start_flags;
307         int rc;
308         bool drop_dryrun = false;
309         ENTRY;
310
311         CDEBUG(D_LFSCK, "%s: OI scrub prep, flags = 0x%x\n",
312                scrub->os_name, flags);
313
314         down_write(&scrub->os_rwsem);
315         if (flags & SS_SET_FAILOUT)
316                 sf->sf_param |= SP_FAILOUT;
317         else if (flags & SS_CLEAR_FAILOUT)
318                 sf->sf_param &= ~SP_FAILOUT;
319
320         if (flags & SS_SET_DRYRUN) {
321                 sf->sf_param |= SP_DRYRUN;
322         } else if (flags & SS_CLEAR_DRYRUN && sf->sf_param & SP_DRYRUN) {
323                 sf->sf_param &= ~SP_DRYRUN;
324                 drop_dryrun = true;
325         }
326
327         if (flags & SS_RESET)
328                 scrub_file_reset(scrub, dev->od_uuid, 0);
329
330         spin_lock(&scrub->os_lock);
331         scrub->os_partial_scan = 0;
332         if (flags & SS_AUTO_FULL) {
333                 scrub->os_full_speed = 1;
334                 sf->sf_flags |= SF_AUTO;
335         } else if (sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
336                                    SF_UPGRADE)) {
337                 scrub->os_full_speed = 1;
338         } else {
339                 scrub->os_full_speed = 0;
340         }
341
342         scrub->os_in_prior = 0;
343         scrub->os_waiting = 0;
344         scrub->os_paused = 0;
345         scrub->os_in_join = 0;
346         scrub->os_full_scrub = 0;
347         spin_unlock(&scrub->os_lock);
348         scrub->os_new_checked = 0;
349         if (drop_dryrun && sf->sf_pos_first_inconsistent != 0)
350                 sf->sf_pos_latest_start = sf->sf_pos_first_inconsistent;
351         else if (sf->sf_pos_last_checkpoint != 0)
352                 sf->sf_pos_latest_start = sf->sf_pos_last_checkpoint + 1;
353         else
354                 sf->sf_pos_latest_start = 1;
355
356         scrub->os_pos_current = sf->sf_pos_latest_start;
357         sf->sf_status = SS_SCANNING;
358         sf->sf_time_latest_start = ktime_get_real_seconds();
359         sf->sf_time_last_checkpoint = sf->sf_time_latest_start;
360         sf->sf_pos_last_checkpoint = sf->sf_pos_latest_start - 1;
361         rc = scrub_file_store(env, scrub);
362         if (!rc) {
363                 spin_lock(&scrub->os_lock);
364                 scrub->os_running = 1;
365                 spin_unlock(&scrub->os_lock);
366                 wake_up_var(scrub);
367         }
368         up_write(&scrub->os_rwsem);
369
370         RETURN(rc);
371 }
372
373 static int osd_scrub_post(const struct lu_env *env, struct osd_device *dev,
374                           int result)
375 {
376         struct lustre_scrub *scrub = &dev->od_scrub;
377         struct scrub_file *sf = &scrub->os_file;
378         int rc;
379         ENTRY;
380
381         CDEBUG(D_LFSCK, "%s: OI scrub post with result = %d\n",
382                scrub->os_name, result);
383
384         down_write(&scrub->os_rwsem);
385         spin_lock(&scrub->os_lock);
386         scrub->os_running = 0;
387         spin_unlock(&scrub->os_lock);
388         if (scrub->os_new_checked > 0) {
389                 sf->sf_items_checked += scrub->os_new_checked;
390                 scrub->os_new_checked = 0;
391                 sf->sf_pos_last_checkpoint = scrub->os_pos_current;
392         }
393         sf->sf_time_last_checkpoint = ktime_get_real_seconds();
394         if (result > 0) {
395                 sf->sf_status = SS_COMPLETED;
396                 if (!(sf->sf_param & SP_DRYRUN)) {
397                         memset(sf->sf_oi_bitmap, 0, SCRUB_OI_BITMAP_SIZE);
398                         sf->sf_flags &= ~(SF_RECREATED | SF_INCONSISTENT |
399                                           SF_UPGRADE | SF_AUTO);
400                 }
401                 sf->sf_time_last_complete = sf->sf_time_last_checkpoint;
402                 sf->sf_success_count++;
403         } else if (result == 0) {
404                 if (scrub->os_paused)
405                         sf->sf_status = SS_PAUSED;
406                 else
407                         sf->sf_status = SS_STOPPED;
408         } else {
409                 sf->sf_status = SS_FAILED;
410         }
411         sf->sf_run_time += ktime_get_seconds() -
412                            scrub->os_time_last_checkpoint;
413
414         rc = scrub_file_store(env, scrub);
415         up_write(&scrub->os_rwsem);
416
417         RETURN(rc < 0 ? rc : result);
418 }
419
420 /* iteration engine */
421
422 static inline int
423 osd_scrub_wakeup(struct lustre_scrub *scrub, struct osd_otable_it *it)
424 {
425         spin_lock(&scrub->os_lock);
426         if (osd_scrub_has_window(it) ||
427             !list_empty(&scrub->os_inconsistent_items) ||
428             it->ooi_waiting || kthread_should_stop())
429                 scrub->os_waiting = 0;
430         else
431                 scrub->os_waiting = 1;
432         spin_unlock(&scrub->os_lock);
433
434         return !scrub->os_waiting;
435 }
436
437 static int osd_scrub_next(const struct lu_env *env, struct osd_device *dev,
438                           struct lu_fid *fid, uint64_t *oid)
439 {
440         struct lustre_scrub *scrub = &dev->od_scrub;
441         struct osd_otable_it *it = dev->od_otable_it;
442         struct lustre_mdt_attrs *lma = NULL;
443         nvlist_t *nvbuf = NULL;
444         int size = 0;
445         int rc = 0;
446         ENTRY;
447
448         if (OBD_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_DELAY) && cfs_fail_val > 0) {
449                 wait_var_event_timeout(
450                         scrub,
451                         !list_empty(&scrub->os_inconsistent_items) ||
452                         kthread_should_stop(),
453                         cfs_time_seconds(cfs_fail_val));
454
455                 if (kthread_should_stop())
456                         RETURN(SCRUB_NEXT_EXIT);
457         }
458
459         if (OBD_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_CRASH)) {
460                 spin_lock(&scrub->os_lock);
461                 scrub->os_running = 0;
462                 spin_unlock(&scrub->os_lock);
463                 RETURN(SCRUB_NEXT_CRASH);
464         }
465
466         if (OBD_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_FATAL))
467                 RETURN(SCRUB_NEXT_FATAL);
468
469 again:
470         if (nvbuf) {
471                 nvlist_free(nvbuf);
472                 nvbuf = NULL;
473                 lma = NULL;
474         }
475
476         if (!list_empty(&scrub->os_inconsistent_items)) {
477                 spin_lock(&scrub->os_lock);
478                 if (likely(!list_empty(&scrub->os_inconsistent_items))) {
479                         struct osd_inconsistent_item *oii;
480
481                         oii = list_entry(scrub->os_inconsistent_items.next,
482                                 struct osd_inconsistent_item, oii_list);
483                         *fid = oii->oii_cache.oic_fid;
484                         *oid = oii->oii_cache.oic_dnode;
485                         scrub->os_in_prior = 1;
486                         spin_unlock(&scrub->os_lock);
487
488                         GOTO(out, rc = 0);
489                 }
490                 spin_unlock(&scrub->os_lock);
491         }
492
493         if (!scrub->os_full_speed && !osd_scrub_has_window(it))
494                 wait_var_event(scrub, osd_scrub_wakeup(scrub, it));
495
496         if (kthread_should_stop())
497                 GOTO(out, rc = SCRUB_NEXT_EXIT);
498
499         rc = -dmu_object_next(dev->od_os, &scrub->os_pos_current, B_FALSE, 0);
500         if (rc)
501                 GOTO(out, rc = (rc == -ESRCH ? SCRUB_NEXT_BREAK : rc));
502
503         rc = __osd_xattr_load_by_oid(dev, scrub->os_pos_current, &nvbuf);
504         if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
505                 goto again;
506
507         if (rc)
508                 GOTO(out, rc);
509
510         LASSERT(nvbuf != NULL);
511         rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
512                                        (uchar_t **)&lma, &size);
513         if (!rc) {
514                 lustre_lma_swab(lma);
515                 if (likely(!(lma->lma_compat & LMAC_NOT_IN_OI) &&
516                            !(lma->lma_incompat & LMAI_AGENT))) {
517                         *fid = lma->lma_self_fid;
518                         *oid = scrub->os_pos_current;
519
520                         GOTO(out, rc = 0);
521                 }
522         }
523
524         if (!scrub->os_full_speed) {
525                 spin_lock(&scrub->os_lock);
526                 it->ooi_prefetched++;
527                 if (it->ooi_waiting) {
528                         it->ooi_waiting = 0;
529                         wake_up_var(scrub);
530                 }
531                 spin_unlock(&scrub->os_lock);
532         }
533
534         goto again;
535
536 out:
537         if (nvbuf)
538                 nvlist_free(nvbuf);
539
540         return rc;
541 }
542
543 static int osd_scrub_exec(const struct lu_env *env, struct osd_device *dev,
544                           const struct lu_fid *fid, uint64_t oid, int rc)
545 {
546         struct lustre_scrub *scrub = &dev->od_scrub;
547         struct osd_otable_it *it = dev->od_otable_it;
548
549         rc = osd_scrub_check_update(env, dev, fid, oid, rc);
550         if (!scrub->os_in_prior) {
551                 if (!scrub->os_full_speed) {
552                         spin_lock(&scrub->os_lock);
553                         it->ooi_prefetched++;
554                         if (it->ooi_waiting) {
555                                 it->ooi_waiting = 0;
556                                 wake_up_var(scrub);
557                         }
558                         spin_unlock(&scrub->os_lock);
559                 }
560         } else {
561                 spin_lock(&scrub->os_lock);
562                 scrub->os_in_prior = 0;
563                 spin_unlock(&scrub->os_lock);
564         }
565
566         if (rc)
567                 return rc;
568
569         rc = scrub_checkpoint(env, scrub);
570         if (rc) {
571                 CDEBUG(D_LFSCK, "%s: fail to checkpoint, pos = %llu: "
572                        "rc = %d\n", scrub->os_name, scrub->os_pos_current, rc);
573                 /* Continue, as long as the scrub itself can go ahead. */
574         }
575
576         return 0;
577 }
578
579 static int osd_scan_ml_file_main(const struct lu_env *env,
580                                  struct osd_device *dev);
581
582 static int osd_scrub_main(void *args)
583 {
584         struct lu_env env;
585         struct osd_device *dev = (struct osd_device *)args;
586         struct lustre_scrub *scrub = &dev->od_scrub;
587         struct lu_fid *fid;
588         uint64_t oid;
589         int rc = 0, ret;
590         ENTRY;
591
592         rc = lu_env_init(&env, LCT_LOCAL | LCT_DT_THREAD);
593         if (rc) {
594                 CDEBUG(D_LFSCK, "%s: OI scrub fail to init env: rc = %d\n",
595                        scrub->os_name, rc);
596                 GOTO(noenv, rc);
597         }
598
599         rc = osd_scrub_prep(&env, dev);
600         if (rc) {
601                 CDEBUG(D_LFSCK, "%s: OI scrub fail to scrub prep: rc = %d\n",
602                        scrub->os_name, rc);
603                 GOTO(out, rc);
604         }
605
606         if (!scrub->os_full_speed) {
607                 struct osd_otable_it *it = dev->od_otable_it;
608
609                 wait_var_event(scrub,
610                                it->ooi_user_ready ||
611                                kthread_should_stop());
612
613                 if (kthread_should_stop())
614                         GOTO(post, rc = 0);
615
616                 scrub->os_pos_current = it->ooi_pos;
617         }
618
619         CDEBUG(D_LFSCK, "%s: OI scrub start, flags = 0x%x, pos = %llu\n",
620                scrub->os_name, scrub->os_start_flags,
621                scrub->os_pos_current);
622
623         fid = &osd_oti_get(&env)->oti_fid;
624         while (!rc && !kthread_should_stop()) {
625                 rc = osd_scrub_next(&env, dev, fid, &oid);
626                 switch (rc) {
627                 case SCRUB_NEXT_EXIT:
628                         GOTO(post, rc = 0);
629                 case SCRUB_NEXT_CRASH:
630                         spin_lock(&scrub->os_lock);
631                         scrub->os_running = 0;
632                         spin_unlock(&scrub->os_lock);
633                         GOTO(out, rc = -EINVAL);
634                 case SCRUB_NEXT_FATAL:
635                         GOTO(post, rc = -EINVAL);
636                 case SCRUB_NEXT_BREAK:
637                         GOTO(post, rc = 1);
638                 }
639
640                 rc = osd_scrub_exec(&env, dev, fid, oid, rc);
641         }
642
643         GOTO(post, rc);
644
645 post:
646         if (scrub->os_has_ml_file) {
647                 ret = osd_scan_ml_file_main(&env, dev);
648                 if (ret != 0)
649                         rc = ret;
650         }
651
652         rc = osd_scrub_post(&env, dev, rc);
653         CDEBUG(D_LFSCK, "%s: OI scrub: stop, pos = %llu: rc = %d\n",
654                scrub->os_name, scrub->os_pos_current, rc);
655
656 out:
657         while (!list_empty(&scrub->os_inconsistent_items)) {
658                 struct osd_inconsistent_item *oii;
659
660                 oii = list_entry(scrub->os_inconsistent_items.next,
661                                  struct osd_inconsistent_item, oii_list);
662                 list_del_init(&oii->oii_list);
663                 OBD_FREE_PTR(oii);
664         }
665
666         lu_env_fini(&env);
667
668 noenv:
669         spin_lock(&scrub->os_lock);
670         scrub->os_running = 0;
671         spin_unlock(&scrub->os_lock);
672         if (xchg(&scrub->os_task, NULL) == NULL)
673                 /* scrub_stop is waiting, we need to synchronize */
674                 wait_var_event(scrub, kthread_should_stop());
675         wake_up_var(scrub);
676         return rc;
677 }
678
679 /* initial OI scrub */
680
681 struct osd_lf_map;
682
683 typedef int (*handle_dirent_t)(const struct lu_env *, struct osd_device *,
684                                const char *, uint64_t, uint64_t,
685                                enum osd_lf_flags, bool);
686 static int osd_ios_varfid_hd(const struct lu_env *, struct osd_device *,
687                              const char *, uint64_t, uint64_t,
688                              enum osd_lf_flags, bool);
689 static int osd_ios_uld_hd(const struct lu_env *, struct osd_device *,
690                           const char *, uint64_t, uint64_t,
691                           enum osd_lf_flags, bool);
692
693 typedef int (*scan_dir_t)(const struct lu_env *, struct osd_device *,
694                           uint64_t, handle_dirent_t, enum osd_lf_flags);
695 static int osd_ios_general_sd(const struct lu_env *, struct osd_device *,
696                               uint64_t, handle_dirent_t, enum osd_lf_flags);
697 static int osd_ios_ROOT_sd(const struct lu_env *, struct osd_device *,
698                            uint64_t, handle_dirent_t, enum osd_lf_flags);
699
700 struct osd_lf_map {
701         char                    *olm_name;
702         struct lu_fid            olm_fid;
703         enum osd_lf_flags        olm_flags;
704         scan_dir_t               olm_scan_dir;
705         handle_dirent_t          olm_handle_dirent;
706 };
707
708 /* Add the new introduced local files in the list in the future. */
709 static const struct osd_lf_map osd_lf_maps[] = {
710         /* CONFIGS */
711         {
712                 .olm_name               = MOUNT_CONFIGS_DIR,
713                 .olm_fid                = {
714                         .f_seq  = FID_SEQ_LOCAL_FILE,
715                         .f_oid  = MGS_CONFIGS_OID,
716                 },
717                 .olm_flags              = OLF_SCAN_SUBITEMS,
718                 .olm_scan_dir           = osd_ios_general_sd,
719                 .olm_handle_dirent      = osd_ios_varfid_hd,
720         },
721
722         /* NIDTBL_VERSIONS */
723         {
724                 .olm_name               = MGS_NIDTBL_DIR,
725                 .olm_flags              = OLF_SCAN_SUBITEMS,
726                 .olm_scan_dir           = osd_ios_general_sd,
727                 .olm_handle_dirent      = osd_ios_varfid_hd,
728         },
729
730         /* PENDING */
731         {
732                 .olm_name               = MDT_ORPHAN_DIR,
733         },
734
735         /* ROOT */
736         {
737                 .olm_name               = "ROOT",
738                 .olm_fid                = {
739                         .f_seq  = FID_SEQ_ROOT,
740                         .f_oid  = FID_OID_ROOT,
741                 },
742                 .olm_flags              = OLF_SCAN_SUBITEMS,
743                 .olm_scan_dir           = osd_ios_ROOT_sd,
744         },
745
746         /* fld */
747         {
748                 .olm_name               = "fld",
749                 .olm_fid                = {
750                         .f_seq  = FID_SEQ_LOCAL_FILE,
751                         .f_oid  = FLD_INDEX_OID,
752                 },
753         },
754
755         /* changelog_catalog */
756         {
757                 .olm_name               = CHANGELOG_CATALOG,
758         },
759
760         /* changelog_users */
761         {
762                 .olm_name               = CHANGELOG_USERS,
763         },
764
765         /* quota_master */
766         {
767                 .olm_name               = QMT_DIR,
768                 .olm_flags              = OLF_SCAN_SUBITEMS,
769                 .olm_scan_dir           = osd_ios_general_sd,
770                 .olm_handle_dirent      = osd_ios_varfid_hd,
771         },
772
773         /* quota_slave */
774         {
775                 .olm_name               = QSD_DIR,
776                 .olm_flags              = OLF_SCAN_SUBITEMS,
777                 .olm_scan_dir           = osd_ios_general_sd,
778                 .olm_handle_dirent      = osd_ios_varfid_hd,
779         },
780
781         /* LFSCK */
782         {
783                 .olm_name               = LFSCK_DIR,
784                 .olm_flags              = OLF_SCAN_SUBITEMS | OLF_NOT_BACKUP,
785                 .olm_scan_dir           = osd_ios_general_sd,
786                 .olm_handle_dirent      = osd_ios_varfid_hd,
787         },
788
789         /* lfsck_bookmark */
790         {
791                 .olm_name               = LFSCK_BOOKMARK,
792         },
793
794         /* lfsck_layout */
795         {
796                 .olm_name               = LFSCK_LAYOUT,
797         },
798
799         /* lfsck_namespace */
800         {
801                 .olm_name               = LFSCK_NAMESPACE,
802         },
803
804         /* OSP update logs update_log{_dir} use f_seq = FID_SEQ_UPDATE_LOG{_DIR}
805          * and f_oid = index for their log files.  See lu_update_log{_dir}_fid()
806          * for more details. */
807
808         /* update_log */
809         {
810                 .olm_name               = "update_log",
811                 .olm_fid                = {
812                         .f_seq  = FID_SEQ_UPDATE_LOG,
813                 },
814                 .olm_flags              = OLF_IDX_IN_FID,
815         },
816
817         /* update_log_dir */
818         {
819                 .olm_name               = "update_log_dir",
820                 .olm_fid        = {
821                         .f_seq  = FID_SEQ_UPDATE_LOG_DIR,
822                 },
823                 .olm_flags              = OLF_SCAN_SUBITEMS | OLF_IDX_IN_FID,
824                 .olm_scan_dir           = osd_ios_general_sd,
825                 .olm_handle_dirent      = osd_ios_uld_hd,
826         },
827
828         /* hsm_actions */
829         {
830                 .olm_name               = HSM_ACTIONS,
831         },
832
833         /* nodemap */
834         {
835                 .olm_name               = LUSTRE_NODEMAP_NAME,
836         },
837
838         /* index_backup */
839         {
840                 .olm_name               = INDEX_BACKUP_DIR,
841                 .olm_fid                = {
842                         .f_seq  = FID_SEQ_LOCAL_FILE,
843                         .f_oid  = INDEX_BACKUP_OID,
844                 },
845                 .olm_flags              = OLF_SCAN_SUBITEMS | OLF_NOT_BACKUP,
846                 .olm_scan_dir           = osd_ios_general_sd,
847                 .olm_handle_dirent      = osd_ios_varfid_hd,
848         },
849
850         {
851                 .olm_name               = NULL
852         }
853 };
854
855 /* Add the new introduced files under .lustre/ in the list in the future. */
856 static const struct osd_lf_map osd_dl_maps[] = {
857         /* .lustre/fid */
858         {
859                 .olm_name               = "fid",
860                 .olm_fid                = {
861                         .f_seq  = FID_SEQ_DOT_LUSTRE,
862                         .f_oid  = FID_OID_DOT_LUSTRE_OBF,
863                 },
864         },
865
866         /* .lustre/lost+found */
867         {
868                 .olm_name               = "lost+found",
869                 .olm_fid                = {
870                         .f_seq  = FID_SEQ_DOT_LUSTRE,
871                         .f_oid  = FID_OID_DOT_LUSTRE_LPF,
872                 },
873         },
874
875         {
876                 .olm_name               = NULL
877         }
878 };
879
880 struct osd_ios_item {
881         struct list_head        oii_list;
882         uint64_t                oii_parent;
883         enum osd_lf_flags       oii_flags;
884         scan_dir_t              oii_scan_dir;
885         handle_dirent_t         oii_handle_dirent;
886 };
887
888 static int osd_ios_new_item(struct osd_device *dev, uint64_t parent,
889                             enum osd_lf_flags flags, scan_dir_t scan_dir,
890                             handle_dirent_t handle_dirent)
891 {
892         struct osd_ios_item *item;
893
894         OBD_ALLOC_PTR(item);
895         if (!item) {
896                 CWARN("%s: initial OI scrub failed to add item for %llu\n",
897                       osd_name(dev), parent);
898                 return -ENOMEM;
899         }
900
901         INIT_LIST_HEAD(&item->oii_list);
902         item->oii_parent = parent;
903         item->oii_flags = flags;
904         item->oii_scan_dir = scan_dir;
905         item->oii_handle_dirent = handle_dirent;
906         list_add_tail(&item->oii_list, &dev->od_ios_list);
907
908         return 0;
909 }
910
911 static bool osd_index_need_recreate(const struct lu_env *env,
912                                     struct osd_device *dev, uint64_t oid)
913 {
914         struct osd_thread_info *info = osd_oti_get(env);
915         zap_attribute_t *za = &info->oti_za2;
916         zap_cursor_t *zc = &info->oti_zc2;
917         int rc;
918         ENTRY;
919
920         zap_cursor_init_serialized(zc, dev->od_os, oid, 0);
921         rc = -zap_cursor_retrieve(zc, za);
922         zap_cursor_fini(zc);
923         if (rc && rc != -ENOENT)
924                 RETURN(true);
925
926         RETURN(false);
927 }
928
929 static void osd_ios_index_register(const struct lu_env *env,
930                                    struct osd_device *osd,
931                                    const struct lu_fid *fid, uint64_t oid)
932 {
933         struct osd_thread_info *info = osd_oti_get(env);
934         zap_attribute_t *za = &info->oti_za2;
935         zap_cursor_t *zc = &info->oti_zc2;
936         struct zap_leaf_entry *le;
937         dnode_t *dn = NULL;
938         sa_handle_t *hdl;
939         __u64 mode = 0;
940         __u32 keysize = 0;
941         __u32 recsize = 0;
942         int rc;
943         ENTRY;
944
945         rc = __osd_obj2dnode(osd->od_os, oid, &dn);
946         if (rc == -EEXIST || rc == -ENOENT)
947                 RETURN_EXIT;
948
949         if (rc < 0)
950                 GOTO(log, rc);
951
952         if (!osd_object_is_zap(dn))
953                 GOTO(log, rc = 1);
954
955         rc = -sa_handle_get(osd->od_os, oid, NULL, SA_HDL_PRIVATE, &hdl);
956         if (rc)
957                 GOTO(log, rc);
958
959         rc = -sa_lookup(hdl, SA_ZPL_MODE(osd), &mode, sizeof(mode));
960         sa_handle_destroy(hdl);
961         if (rc)
962                 GOTO(log, rc);
963
964         if (!S_ISREG(mode))
965                 GOTO(log, rc = 1);
966
967         zap_cursor_init_serialized(zc, osd->od_os, oid, 0);
968         rc = -zap_cursor_retrieve(zc, za);
969         if (rc)
970                 /* Skip empty index object */
971                 GOTO(fini, rc = (rc == -ENOENT ? 1 : rc));
972
973         if (zc->zc_zap->zap_ismicro ||
974             !(zap_f_phys(zc->zc_zap)->zap_flags & ZAP_FLAG_UINT64_KEY))
975                 GOTO(fini, rc = 1);
976
977         le = ZAP_LEAF_ENTRY(zc->zc_leaf, 0);
978         keysize = le->le_name_numints * 8;
979         recsize = za->za_integer_length * za->za_num_integers;
980         if (likely(keysize && recsize))
981                 rc = osd_index_register(osd, fid, keysize, recsize);
982
983         GOTO(fini, rc);
984
985 fini:
986         zap_cursor_fini(zc);
987
988 log:
989         if (dn)
990                 osd_dnode_rele(dn);
991         if (rc < 0)
992                 CWARN("%s: failed to register index "DFID" (%u/%u): rc = %d\n",
993                       osd_name(osd), PFID(fid), keysize, recsize, rc);
994         else if (!rc)
995                 CDEBUG(D_LFSCK, "%s: registered index "DFID" (%u/%u)\n",
996                        osd_name(osd), PFID(fid), keysize, recsize);
997 }
998
999 static void osd_index_restore(const struct lu_env *env, struct osd_device *dev,
1000                               struct lustre_index_restore_unit *liru, void *buf,
1001                               int bufsize)
1002 {
1003         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
1004         struct lu_fid *tgt_fid = &liru->liru_cfid;
1005         struct lu_fid bak_fid;
1006         int rc;
1007         ENTRY;
1008
1009         lustre_fid2lbx(buf, tgt_fid, bufsize);
1010         rc = -zap_lookup(dev->od_os, dev->od_index_backup_id, buf, 8,
1011                          sizeof(*zde) / 8, (void *)zde);
1012         if (rc)
1013                 GOTO(log, rc);
1014
1015         rc = osd_get_fid_by_oid(env, dev, zde->lzd_reg.zde_dnode, &bak_fid);
1016         if (rc)
1017                 GOTO(log, rc);
1018
1019         /* The OI mapping for index may be invalid, since it will be
1020          * re-created, not update the OI mapping, just cache it in RAM. */
1021         rc = osd_idc_find_and_init_with_oid(env, dev, tgt_fid,
1022                                             liru->liru_clid);
1023         if (!rc)
1024                 rc = lustre_index_restore(env, &dev->od_dt_dev,
1025                                 &liru->liru_pfid, tgt_fid, &bak_fid,
1026                                 liru->liru_name, &dev->od_index_backup_list,
1027                                 &dev->od_lock, buf, bufsize);
1028         GOTO(log, rc);
1029
1030 log:
1031         CDEBUG(D_WARNING, "%s: restore index '%s' with "DFID": rc = %d\n",
1032                osd_name(dev), liru->liru_name, PFID(tgt_fid), rc);
1033 }
1034
1035 /**
1036  * verify FID-in-LMA and OI entry for one object
1037  *
1038  * ios: Initial OI Scrub.
1039  */
1040 static int osd_ios_scan_one(const struct lu_env *env, struct osd_device *dev,
1041                             const struct lu_fid *fid, uint64_t parent,
1042                             uint64_t oid, const char *name,
1043                             enum osd_lf_flags flags)
1044 {
1045         struct lustre_scrub *scrub = &dev->od_scrub;
1046         struct scrub_file *sf = &scrub->os_file;
1047         struct lustre_mdt_attrs *lma = NULL;
1048         nvlist_t *nvbuf = NULL;
1049         struct lu_fid tfid;
1050         uint64_t oid2 = 0;
1051         __u64 flag = 0;
1052         int size = 0;
1053         int op = 0;
1054         int rc;
1055         ENTRY;
1056
1057         rc = __osd_xattr_load_by_oid(dev, oid, &nvbuf);
1058         if (unlikely(rc == -ENOENT || rc == -EEXIST))
1059                 RETURN(0);
1060
1061         if (rc && rc != -ENODATA) {
1062                 CWARN("%s: initial OI scrub failed to get lma for %llu: "
1063                       "rc = %d\n", osd_name(dev), oid, rc);
1064
1065                 RETURN(rc);
1066         }
1067
1068         if (!rc) {
1069                 LASSERT(nvbuf != NULL);
1070                 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
1071                                                (uchar_t **)&lma, &size);
1072                 if (rc || size == 0) {
1073                         LASSERT(lma == NULL);
1074                         rc = -ENODATA;
1075                 } else {
1076                         LASSERTF(lma != NULL, "corrupted LMA, size %d\n", size);
1077                         lustre_lma_swab(lma);
1078                         if (lma->lma_compat & LMAC_NOT_IN_OI) {
1079                                 nvlist_free(nvbuf);
1080                                 RETURN(0);
1081                         }
1082
1083                         if (lma->lma_compat & LMAC_IDX_BACKUP &&
1084                             osd_index_need_recreate(env, dev, oid)) {
1085                                 if (parent == dev->od_root) {
1086                                         lu_local_obj_fid(&tfid,
1087                                                          OSD_FS_ROOT_OID);
1088                                 } else {
1089                                         rc = osd_get_fid_by_oid(env, dev,
1090                                                                 parent, &tfid);
1091                                         if (rc) {
1092                                                 nvlist_free(nvbuf);
1093                                                 RETURN(rc);
1094                                         }
1095                                 }
1096
1097                                 rc = lustre_liru_new(
1098                                                 &dev->od_index_restore_list,
1099                                                 &tfid, &lma->lma_self_fid, oid,
1100                                                 name, strlen(name));
1101                                 nvlist_free(nvbuf);
1102                                 RETURN(rc);
1103                         }
1104
1105                         tfid = lma->lma_self_fid;
1106                         if (!(flags & OLF_NOT_BACKUP))
1107                                 osd_ios_index_register(env, dev, &tfid, oid);
1108                 }
1109                 nvlist_free(nvbuf);
1110         }
1111
1112         if (rc == -ENODATA) {
1113                 if (!fid) {
1114                         /* Skip the object without FID-in-LMA */
1115                         CDEBUG(D_LFSCK, "%s: %llu has no FID-in-LMA, skip it\n",
1116                                osd_name(dev), oid);
1117
1118                         RETURN(0);
1119                 }
1120
1121                 LASSERT(!fid_is_zero(fid));
1122
1123                 tfid = *fid;
1124                 if (flags & OLF_IDX_IN_FID) {
1125                         LASSERT(dev->od_index >= 0);
1126
1127                         tfid.f_oid = dev->od_index;
1128                 }
1129         }
1130
1131         rc = osd_fid_lookup(env, dev, &tfid, &oid2);
1132         if (rc) {
1133                 if (rc != -ENOENT) {
1134                         CWARN("%s: initial OI scrub failed to lookup fid for "
1135                               DFID"=>%llu: rc = %d\n",
1136                               osd_name(dev), PFID(&tfid), oid, rc);
1137
1138                         RETURN(rc);
1139                 }
1140
1141                 flag = SF_RECREATED;
1142                 op = DTO_INDEX_INSERT;
1143         } else {
1144                 if (oid == oid2)
1145                         RETURN(0);
1146
1147                 flag = SF_INCONSISTENT;
1148                 op = DTO_INDEX_UPDATE;
1149         }
1150
1151         if (!(sf->sf_flags & flag)) {
1152                 scrub_file_reset(scrub, dev->od_uuid, flag);
1153                 rc = scrub_file_store(env, scrub);
1154                 if (rc)
1155                         RETURN(rc);
1156         }
1157
1158         rc = osd_scrub_refresh_mapping(env, dev, &tfid, oid, op, true, name);
1159
1160         RETURN(rc > 0 ? 0 : rc);
1161 }
1162
1163 static int osd_ios_varfid_hd(const struct lu_env *env, struct osd_device *dev,
1164                              const char *name, uint64_t parent, uint64_t oid,
1165                              enum osd_lf_flags flags, bool is_dir)
1166 {
1167         int rc;
1168         ENTRY;
1169
1170         rc = osd_ios_scan_one(env, dev, NULL, parent, oid, name, 0);
1171         if (!rc && is_dir)
1172                 rc = osd_ios_new_item(dev, oid, flags, osd_ios_general_sd,
1173                                       osd_ios_varfid_hd);
1174
1175         RETURN(rc);
1176 }
1177
1178 static int osd_ios_uld_hd(const struct lu_env *env, struct osd_device *dev,
1179                           const char *name, uint64_t parent, uint64_t oid,
1180                           enum osd_lf_flags flags, bool is_dir)
1181 {
1182         struct lu_fid tfid;
1183         int rc;
1184         ENTRY;
1185
1186         /* skip any non-DFID format name */
1187         if (name[0] != '[')
1188                 RETURN(0);
1189
1190         /* skip the start '[' */
1191         sscanf(&name[1], SFID, RFID(&tfid));
1192         if (fid_is_sane(&tfid))
1193                 rc = osd_ios_scan_one(env, dev, &tfid, parent, oid, name, 0);
1194         else
1195                 rc = -EIO;
1196
1197         RETURN(rc);
1198 }
1199
1200 /*
1201  * General scanner for the directories execpt /ROOT during initial OI scrub.
1202  * It scans the name entries under the given directory one by one. For each
1203  * entry, verifies its OI mapping via the given @handle_dirent.
1204  */
1205 static int osd_ios_general_sd(const struct lu_env *env, struct osd_device *dev,
1206                               uint64_t parent, handle_dirent_t handle_dirent,
1207                               enum osd_lf_flags flags)
1208 {
1209         struct osd_thread_info *info = osd_oti_get(env);
1210         struct luz_direntry *zde = &info->oti_zde;
1211         zap_attribute_t *za = &info->oti_za;
1212         zap_cursor_t *zc = &info->oti_zc;
1213         int rc;
1214         ENTRY;
1215
1216         zap_cursor_init_serialized(zc, dev->od_os, parent, 0);
1217         rc = -zap_cursor_retrieve(zc, za);
1218         if (rc == -ENOENT)
1219                 zap_cursor_advance(zc);
1220         else if (rc)
1221                 GOTO(log, rc);
1222
1223         while (1) {
1224                 rc = -zap_cursor_retrieve(zc, za);
1225                 if (rc)
1226                         GOTO(log, rc = (rc == -ENOENT ? 0 : rc));
1227
1228                 /* skip the entry started with '.' */
1229                 if (likely(za->za_name[0] != '.')) {
1230                         rc = osd_zap_lookup(dev, parent, NULL, za->za_name,
1231                                         za->za_integer_length,
1232                                         sizeof(*zde) / za->za_integer_length,
1233                                         (void *)zde);
1234                         if (rc) {
1235                                 CWARN("%s: initial OI scrub failed to lookup "
1236                                       "%s under %llu: rc = %d\n",
1237                                       osd_name(dev), za->za_name, parent, rc);
1238                                 continue;
1239                         }
1240
1241                         rc = handle_dirent(env, dev, za->za_name, parent,
1242                                         zde->lzd_reg.zde_dnode, flags,
1243                                         S_ISDIR(DTTOIF(zde->lzd_reg.zde_type)) ?
1244                                         true : false);
1245                         CDEBUG(D_LFSCK, "%s: initial OI scrub handled %s under "
1246                                "%llu: rc = %d\n",
1247                                osd_name(dev), za->za_name, parent, rc);
1248                 }
1249
1250                 zap_cursor_advance(zc);
1251         }
1252
1253 log:
1254         if (rc)
1255                 CWARN("%s: initial OI scrub failed to scan the directory %llu: "
1256                       "rc = %d\n", osd_name(dev), parent, rc);
1257         zap_cursor_fini(zc);
1258
1259         return rc;
1260 }
1261
1262 /*
1263  * The scanner for /ROOT directory. It is not all the items under /ROOT will
1264  * be scanned during the initial OI scrub, instead, only the .lustre and the
1265  * sub-items under .lustre will be handled.
1266  */
1267 static int osd_ios_ROOT_sd(const struct lu_env *env, struct osd_device *dev,
1268                            uint64_t parent, handle_dirent_t handle_dirent,
1269                            enum osd_lf_flags flags)
1270 {
1271         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
1272         const struct osd_lf_map *map;
1273         uint64_t oid;
1274         int rc;
1275         int rc1 = 0;
1276         ENTRY;
1277
1278         rc = osd_zap_lookup(dev, parent, NULL, dot_lustre_name, 8,
1279                             sizeof(*zde) / 8, (void *)zde);
1280         if (rc == -ENOENT) {
1281                 /* The .lustre directory is lost. That is not fatal. It can
1282                  * be re-created in the subsequent MDT start processing. */
1283                 RETURN(0);
1284         }
1285
1286         if (rc) {
1287                 CWARN("%s: initial OI scrub failed to find .lustre: "
1288                       "rc = %d\n", osd_name(dev), rc);
1289
1290                 RETURN(rc);
1291         }
1292
1293         oid = zde->lzd_reg.zde_dnode;
1294         rc = osd_ios_scan_one(env, dev, &LU_DOT_LUSTRE_FID, parent, oid,
1295                               dot_lustre_name, 0);
1296         if (rc)
1297                 RETURN(rc);
1298
1299         for (map = osd_dl_maps; map->olm_name; map++) {
1300                 rc = osd_zap_lookup(dev, oid, NULL, map->olm_name, 8,
1301                                     sizeof(*zde) / 8, (void *)zde);
1302                 if (rc) {
1303                         if (rc != -ENOENT)
1304                                 CWARN("%s: initial OI scrub failed to find the entry %s under .lustre: rc = %d\n",
1305                                       osd_name(dev), map->olm_name, rc);
1306                         else if (!fid_is_zero(&map->olm_fid))
1307                                 /* Try to remove the stale OI mapping. */
1308                                 osd_scrub_refresh_mapping(env, dev,
1309                                                 &map->olm_fid, 0,
1310                                                 DTO_INDEX_DELETE, true,
1311                                                 map->olm_name);
1312                         continue;
1313                 }
1314
1315                 rc = osd_ios_scan_one(env, dev, &map->olm_fid, oid,
1316                                       zde->lzd_reg.zde_dnode, map->olm_name,
1317                                       map->olm_flags);
1318                 if (rc)
1319                         rc1 = rc;
1320         }
1321
1322         RETURN(rc1);
1323 }
1324
1325 static void osd_initial_OI_scrub(const struct lu_env *env,
1326                                  struct osd_device *dev)
1327 {
1328         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
1329         const struct osd_lf_map *map;
1330         int rc;
1331         ENTRY;
1332
1333         for (map = osd_lf_maps; map->olm_name; map++) {
1334                 rc = osd_zap_lookup(dev, dev->od_root, NULL, map->olm_name, 8,
1335                                     sizeof(*zde) / 8, (void *)zde);
1336                 if (rc) {
1337                         if (rc != -ENOENT)
1338                                 CWARN("%s: initial OI scrub failed "
1339                                       "to find the entry %s: rc = %d\n",
1340                                       osd_name(dev), map->olm_name, rc);
1341                         else if (!fid_is_zero(&map->olm_fid))
1342                                 /* Try to remove the stale OI mapping. */
1343                                 osd_scrub_refresh_mapping(env, dev,
1344                                                 &map->olm_fid, 0,
1345                                                 DTO_INDEX_DELETE, true,
1346                                                 map->olm_name);
1347                         continue;
1348                 }
1349
1350                 rc = osd_ios_scan_one(env, dev, &map->olm_fid, dev->od_root,
1351                                       zde->lzd_reg.zde_dnode, map->olm_name,
1352                                       map->olm_flags);
1353                 if (!rc && map->olm_flags & OLF_SCAN_SUBITEMS)
1354                         osd_ios_new_item(dev, zde->lzd_reg.zde_dnode,
1355                                          map->olm_flags, map->olm_scan_dir,
1356                                          map->olm_handle_dirent);
1357         }
1358
1359         while (!list_empty(&dev->od_ios_list)) {
1360                 struct osd_ios_item *item;
1361
1362                 item = list_entry(dev->od_ios_list.next,
1363                                   struct osd_ios_item, oii_list);
1364                 list_del_init(&item->oii_list);
1365                 item->oii_scan_dir(env, dev, item->oii_parent,
1366                                    item->oii_handle_dirent, item->oii_flags);
1367                 OBD_FREE_PTR(item);
1368         }
1369
1370         if (!list_empty(&dev->od_index_restore_list)) {
1371                 char *buf;
1372
1373                 OBD_ALLOC_LARGE(buf, INDEX_BACKUP_BUFSIZE);
1374                 if (!buf)
1375                         CERROR("%s: not enough RAM for rebuild index\n",
1376                                osd_name(dev));
1377
1378                 while (!list_empty(&dev->od_index_restore_list)) {
1379                         struct lustre_index_restore_unit *liru;
1380
1381                         liru = list_entry(dev->od_index_restore_list.next,
1382                                           struct lustre_index_restore_unit,
1383                                           liru_link);
1384                         list_del(&liru->liru_link);
1385                         if (buf)
1386                                 osd_index_restore(env, dev, liru, buf,
1387                                                   INDEX_BACKUP_BUFSIZE);
1388                         OBD_FREE(liru, liru->liru_len);
1389                 }
1390
1391                 if (buf)
1392                         OBD_FREE_LARGE(buf, INDEX_BACKUP_BUFSIZE);
1393         }
1394
1395         EXIT;
1396 }
1397
1398 /* OI scrub start/stop */
1399
1400 int osd_scrub_start(const struct lu_env *env, struct osd_device *dev,
1401                     __u32 flags)
1402 {
1403         int rc;
1404         ENTRY;
1405
1406         if (dev->od_dt_dev.dd_rdonly)
1407                 RETURN(-EROFS);
1408
1409         /* od_otable_sem: prevent concurrent start/stop */
1410         down(&dev->od_otable_sem);
1411         rc = scrub_start(osd_scrub_main, &dev->od_scrub, dev, flags);
1412         up(&dev->od_otable_sem);
1413
1414         RETURN(rc == -EALREADY ? 0 : rc);
1415 }
1416
1417 void osd_scrub_stop(struct osd_device *dev)
1418 {
1419         struct lustre_scrub *scrub = &dev->od_scrub;
1420         ENTRY;
1421
1422         /* od_otable_sem: prevent concurrent start/stop */
1423         down(&dev->od_otable_sem);
1424         spin_lock(&scrub->os_lock);
1425         scrub->os_paused = 1;
1426         spin_unlock(&scrub->os_lock);
1427         scrub_stop(scrub);
1428         up(&dev->od_otable_sem);
1429
1430         EXIT;
1431 }
1432
1433 /* OI scrub setup/cleanup */
1434
1435 static const char osd_scrub_name[] = "OI_scrub";
1436
1437 int osd_scrub_setup(const struct lu_env *env, struct osd_device *dev,
1438                     bool resetoi)
1439 {
1440         struct osd_thread_info *info = osd_oti_get(env);
1441         struct lustre_scrub *scrub = &dev->od_scrub;
1442         struct scrub_file *sf = &scrub->os_file;
1443         struct lu_fid *fid = &info->oti_fid;
1444         struct dt_object *obj;
1445         uint64_t oid;
1446         int rc = 0;
1447         bool dirty = false;
1448         ENTRY;
1449
1450         memcpy(dev->od_uuid.b,
1451                &dsl_dataset_phys(dev->od_os->os_dsl_dataset)->ds_guid,
1452                sizeof(dsl_dataset_phys(dev->od_os->os_dsl_dataset)->ds_guid));
1453         memset(&dev->od_scrub, 0, sizeof(struct lustre_scrub));
1454         init_rwsem(&scrub->os_rwsem);
1455         spin_lock_init(&scrub->os_lock);
1456         INIT_LIST_HEAD(&scrub->os_inconsistent_items);
1457         scrub->os_name = osd_name(dev);
1458
1459         /* 'What the @fid is' is not imporatant, because the object
1460          * has no OI mapping, and only is visible inside the OSD.*/
1461         fid->f_seq = FID_SEQ_IGIF_MAX;
1462         if (dev->od_is_ost)
1463                 fid->f_oid = ((1 << 31) | dev->od_index) + 1;
1464         else
1465                 fid->f_oid = dev->od_index + 1;
1466         fid->f_ver = 0;
1467         rc = osd_obj_find_or_create(env, dev, dev->od_root,
1468                                     osd_scrub_name, &oid, fid, false);
1469         if (rc)
1470                 RETURN(rc);
1471
1472         rc = osd_idc_find_and_init_with_oid(env, dev, fid, oid);
1473         if (rc)
1474                 RETURN(rc);
1475
1476         obj = lu2dt(lu_object_find_slice(env, osd2lu_dev(dev), fid, NULL));
1477         if (IS_ERR_OR_NULL(obj))
1478                 RETURN(obj ? PTR_ERR(obj) : -ENOENT);
1479
1480         obj->do_body_ops = &osd_body_scrub_ops;
1481         scrub->os_obj = obj;
1482         rc = scrub_file_load(env, scrub);
1483         if (rc == -ENOENT || rc == -EFAULT) {
1484                 scrub_file_init(scrub, dev->od_uuid);
1485                 dirty = true;
1486         } else if (rc < 0) {
1487                 GOTO(cleanup_obj, rc);
1488         } else {
1489                 if (!uuid_equal(&sf->sf_uuid, &dev->od_uuid)) {
1490                         CDEBUG(D_LFSCK,
1491                                "%s: UUID has been changed from %pU to %pU\n",
1492                                osd_name(dev), &sf->sf_uuid, &dev->od_uuid);
1493                         scrub_file_reset(scrub, dev->od_uuid, SF_INCONSISTENT);
1494                         dirty = true;
1495                 } else if (sf->sf_status == SS_SCANNING) {
1496                         sf->sf_status = SS_CRASHED;
1497                         dirty = true;
1498                 }
1499
1500                 if ((sf->sf_oi_count & (sf->sf_oi_count - 1)) != 0) {
1501                         LCONSOLE_WARN("%s: invalid oi count %d, set it to %d\n",
1502                                       osd_name(dev), sf->sf_oi_count,
1503                                       osd_oi_count);
1504                         sf->sf_oi_count = osd_oi_count;
1505                         dirty = true;
1506                 }
1507         }
1508
1509         if (sf->sf_pos_last_checkpoint != 0)
1510                 scrub->os_pos_current = sf->sf_pos_last_checkpoint + 1;
1511         else
1512                 scrub->os_pos_current = 1;
1513
1514         if (dirty) {
1515                 rc = scrub_file_store(env, scrub);
1516                 if (rc)
1517                         GOTO(cleanup_obj, rc);
1518         }
1519
1520         /* Initialize OI files. */
1521         rc = osd_oi_init(env, dev, resetoi);
1522         if (rc < 0)
1523                 GOTO(cleanup_obj, rc);
1524
1525         if (!dev->od_dt_dev.dd_rdonly)
1526                 osd_initial_OI_scrub(env, dev);
1527
1528         if (!dev->od_dt_dev.dd_rdonly &&
1529             dev->od_auto_scrub_interval != AS_NEVER &&
1530             ((sf->sf_status == SS_PAUSED) ||
1531              (sf->sf_status == SS_CRASHED &&
1532               sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
1533                               SF_UPGRADE | SF_AUTO)) ||
1534              (sf->sf_status == SS_INIT &&
1535               sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
1536                               SF_UPGRADE))))
1537                 rc = osd_scrub_start(env, dev, SS_AUTO_FULL);
1538
1539         if (rc)
1540                 GOTO(cleanup_oi, rc);
1541
1542         RETURN(0);
1543
1544 cleanup_oi:
1545         osd_oi_fini(env, dev);
1546 cleanup_obj:
1547         dt_object_put_nocache(env, scrub->os_obj);
1548         scrub->os_obj = NULL;
1549
1550         return rc;
1551 }
1552
1553 void osd_scrub_cleanup(const struct lu_env *env, struct osd_device *dev)
1554 {
1555         struct lustre_scrub *scrub = &dev->od_scrub;
1556
1557         LASSERT(!dev->od_otable_it);
1558
1559         if (scrub->os_obj) {
1560                 osd_scrub_stop(dev);
1561                 dt_object_put_nocache(env, scrub->os_obj);
1562                 scrub->os_obj = NULL;
1563         }
1564
1565         if (dev->od_oi_table)
1566                 osd_oi_fini(env, dev);
1567 }
1568
1569 /* object table based iteration APIs */
1570
1571 static struct dt_it *osd_otable_it_init(const struct lu_env *env,
1572                                        struct dt_object *dt, __u32 attr)
1573 {
1574         enum dt_otable_it_flags flags = attr >> DT_OTABLE_IT_FLAGS_SHIFT;
1575         enum dt_otable_it_valid valid = attr & ~DT_OTABLE_IT_FLAGS_MASK;
1576         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
1577         struct lustre_scrub *scrub = &dev->od_scrub;
1578         struct osd_otable_it *it;
1579         __u32 start = 0;
1580         int rc;
1581         ENTRY;
1582
1583         if (dev->od_dt_dev.dd_rdonly)
1584                 RETURN(ERR_PTR(-EROFS));
1585
1586         /* od_otable_sem: prevent concurrent init/fini */
1587         down(&dev->od_otable_sem);
1588         if (dev->od_otable_it)
1589                 GOTO(out, it = ERR_PTR(-EALREADY));
1590
1591         OBD_ALLOC_PTR(it);
1592         if (!it)
1593                 GOTO(out, it = ERR_PTR(-ENOMEM));
1594
1595         if (flags & DOIF_OUTUSED)
1596                 it->ooi_used_outside = 1;
1597
1598         if (flags & DOIF_RESET)
1599                 start |= SS_RESET;
1600
1601         if (valid & DOIV_ERROR_HANDLE) {
1602                 if (flags & DOIF_FAILOUT)
1603                         start |= SS_SET_FAILOUT;
1604                 else
1605                         start |= SS_CLEAR_FAILOUT;
1606         }
1607
1608         if (valid & DOIV_DRYRUN) {
1609                 if (flags & DOIF_DRYRUN)
1610                         start |= SS_SET_DRYRUN;
1611                 else
1612                         start |= SS_CLEAR_DRYRUN;
1613         }
1614
1615         /* XXX: dmu_object_next() does NOT find dnodes allocated
1616          *      in the current non-committed txg, so we force txg
1617          *      commit to find all existing dnodes ... */
1618         txg_wait_synced(dmu_objset_pool(dev->od_os), 0ULL);
1619
1620         dev->od_otable_it = it;
1621         it->ooi_dev = dev;
1622         rc = scrub_start(osd_scrub_main, scrub, dev, start & ~SS_AUTO_PARTIAL);
1623         if (rc == -EALREADY) {
1624                 it->ooi_pos = 1;
1625         } else if (rc < 0) {
1626                 dev->od_otable_it = NULL;
1627                 OBD_FREE_PTR(it);
1628                 it = ERR_PTR(rc);
1629         } else {
1630                 it->ooi_pos = scrub->os_pos_current;
1631         }
1632
1633         GOTO(out, it);
1634
1635 out:
1636         up(&dev->od_otable_sem);
1637         return (struct dt_it *)it;
1638 }
1639
1640 static void osd_otable_it_fini(const struct lu_env *env, struct dt_it *di)
1641 {
1642         struct osd_otable_it *it = (struct osd_otable_it *)di;
1643         struct osd_device *dev = it->ooi_dev;
1644
1645         /* od_otable_sem: prevent concurrent init/fini */
1646         down(&dev->od_otable_sem);
1647         scrub_stop(&dev->od_scrub);
1648         LASSERT(dev->od_otable_it == it);
1649
1650         dev->od_otable_it = NULL;
1651         up(&dev->od_otable_sem);
1652         OBD_FREE_PTR(it);
1653 }
1654
1655 static int osd_otable_it_get(const struct lu_env *env,
1656                              struct dt_it *di, const struct dt_key *key)
1657 {
1658         return 0;
1659 }
1660
1661 static void osd_otable_it_put(const struct lu_env *env, struct dt_it *di)
1662 {
1663 }
1664
1665 static void osd_otable_it_preload(const struct lu_env *env,
1666                                   struct osd_otable_it *it)
1667 {
1668         struct osd_device *dev = it->ooi_dev;
1669         int rc;
1670
1671         /* can go negative on the very first access to the iterator
1672          * or if some non-Lustre objects were found */
1673         if (unlikely(it->ooi_prefetched < 0))
1674                 it->ooi_prefetched = 0;
1675
1676         if (it->ooi_prefetched >= (OTABLE_PREFETCH >> 1))
1677                 return;
1678
1679         if (it->ooi_prefetched_dnode == 0)
1680                 it->ooi_prefetched_dnode = it->ooi_pos;
1681
1682         while (it->ooi_prefetched < OTABLE_PREFETCH) {
1683                 rc = -dmu_object_next(dev->od_os, &it->ooi_prefetched_dnode,
1684                                       B_FALSE, 0);
1685                 if (rc)
1686                         break;
1687
1688                 osd_dmu_prefetch(dev->od_os, it->ooi_prefetched_dnode,
1689                                  0, 0, 0, ZIO_PRIORITY_ASYNC_READ);
1690                 it->ooi_prefetched++;
1691         }
1692 }
1693
1694 static inline int
1695 osd_otable_it_wakeup(struct lustre_scrub *scrub, struct osd_otable_it *it)
1696 {
1697         spin_lock(&scrub->os_lock);
1698         if (it->ooi_pos < scrub->os_pos_current || scrub->os_waiting ||
1699             !scrub->os_running)
1700                 it->ooi_waiting = 0;
1701         else
1702                 it->ooi_waiting = 1;
1703         spin_unlock(&scrub->os_lock);
1704
1705         return !it->ooi_waiting;
1706 }
1707
1708 static int osd_otable_it_next(const struct lu_env *env, struct dt_it *di)
1709 {
1710         struct osd_otable_it *it = (struct osd_otable_it *)di;
1711         struct osd_device *dev = it->ooi_dev;
1712         struct lustre_scrub *scrub = &dev->od_scrub;
1713         struct lustre_mdt_attrs *lma = NULL;
1714         nvlist_t *nvbuf = NULL;
1715         int rc, size = 0;
1716         bool locked;
1717         ENTRY;
1718
1719         LASSERT(it->ooi_user_ready);
1720         fid_zero(&it->ooi_fid);
1721
1722         if (unlikely(it->ooi_all_cached))
1723                 RETURN(1);
1724
1725 again:
1726         if (nvbuf) {
1727                 nvlist_free(nvbuf);
1728                 nvbuf = NULL;
1729                 lma = NULL;
1730                 size = 0;
1731         }
1732
1733         if (it->ooi_pos >= scrub->os_pos_current)
1734                 wait_var_event(scrub,
1735                                osd_otable_it_wakeup(scrub, it));
1736
1737         if (!scrub->os_running && !it->ooi_used_outside)
1738                 GOTO(out, rc = 1);
1739
1740         rc = -dmu_object_next(dev->od_os, &it->ooi_pos, B_FALSE, 0);
1741         if (rc) {
1742                 if (unlikely(rc == -ESRCH)) {
1743                         it->ooi_all_cached = 1;
1744                         rc = 1;
1745                 }
1746
1747                 GOTO(out, rc);
1748         }
1749
1750         rc = __osd_xattr_load_by_oid(dev, it->ooi_pos, &nvbuf);
1751
1752         locked = false;
1753         if (!scrub->os_full_speed) {
1754                 spin_lock(&scrub->os_lock);
1755                 locked = true;
1756         }
1757         it->ooi_prefetched--;
1758         if (!scrub->os_full_speed) {
1759                 if (scrub->os_waiting) {
1760                         scrub->os_waiting = 0;
1761                         wake_up_var(scrub);
1762                 }
1763         }
1764         if (locked)
1765                 spin_unlock(&scrub->os_lock);
1766
1767         if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
1768                 goto again;
1769
1770         if (rc)
1771                 GOTO(out, rc);
1772
1773         LASSERT(nvbuf != NULL);
1774         rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
1775                                        (uchar_t **)&lma, &size);
1776         if (rc || size == 0)
1777                 /* It is either non-Lustre object or OSD internal object,
1778                  * ignore it, go ahead */
1779                 goto again;
1780
1781         LASSERTF(lma != NULL, "corrupted LMA, size %d\n", size);
1782         lustre_lma_swab(lma);
1783         if (unlikely(lma->lma_compat & LMAC_NOT_IN_OI ||
1784                      lma->lma_incompat & LMAI_AGENT))
1785                 goto again;
1786
1787         it->ooi_fid = lma->lma_self_fid;
1788
1789         GOTO(out, rc = 0);
1790
1791 out:
1792         if (nvbuf)
1793                 nvlist_free(nvbuf);
1794
1795         if (!rc && scrub->os_full_speed)
1796                 osd_otable_it_preload(env, it);
1797
1798         return rc;
1799 }
1800
1801 static struct dt_key *osd_otable_it_key(const struct lu_env *env,
1802                                         const struct dt_it *di)
1803 {
1804         return NULL;
1805 }
1806
1807 static int osd_otable_it_key_size(const struct lu_env *env,
1808                                   const struct dt_it *di)
1809 {
1810         return sizeof(__u64);
1811 }
1812
1813 static int osd_otable_it_rec(const struct lu_env *env, const struct dt_it *di,
1814                              struct dt_rec *rec, __u32 attr)
1815 {
1816         struct osd_otable_it *it  = (struct osd_otable_it *)di;
1817         struct lu_fid *fid = (struct lu_fid *)rec;
1818
1819         *fid = it->ooi_fid;
1820         return 0;
1821 }
1822
1823 static __u64 osd_otable_it_store(const struct lu_env *env,
1824                                  const struct dt_it *di)
1825 {
1826         struct osd_otable_it *it = (struct osd_otable_it *)di;
1827
1828         return it->ooi_pos;
1829 }
1830
1831 /**
1832  * Set the OSD layer iteration start position as the specified hash.
1833  */
1834 static int osd_otable_it_load(const struct lu_env *env,
1835                               const struct dt_it *di, __u64 hash)
1836 {
1837         struct osd_otable_it *it = (struct osd_otable_it *)di;
1838         struct osd_device *dev = it->ooi_dev;
1839         struct lustre_scrub *scrub = &dev->od_scrub;
1840         int rc;
1841         ENTRY;
1842
1843         /* Forbid to set iteration position after iteration started. */
1844         if (it->ooi_user_ready)
1845                 RETURN(-EPERM);
1846
1847         if (hash > OSD_OTABLE_MAX_HASH)
1848                 hash = OSD_OTABLE_MAX_HASH;
1849
1850         /* The hash is the last checkpoint position,
1851          * we will start from the next one. */
1852         it->ooi_pos = hash + 1;
1853         it->ooi_prefetched = 0;
1854         it->ooi_prefetched_dnode = 0;
1855         it->ooi_user_ready = 1;
1856         if (!scrub->os_full_speed)
1857                 wake_up_var(scrub);
1858
1859         /* Unplug OSD layer iteration by the first next() call. */
1860         rc = osd_otable_it_next(env, (struct dt_it *)it);
1861
1862         RETURN(rc);
1863 }
1864
1865 static int osd_otable_it_key_rec(const struct lu_env *env,
1866                                  const struct dt_it *di, void *key_rec)
1867 {
1868         return 0;
1869 }
1870
1871 const struct dt_index_operations osd_otable_ops = {
1872         .dio_it = {
1873                 .init     = osd_otable_it_init,
1874                 .fini     = osd_otable_it_fini,
1875                 .get      = osd_otable_it_get,
1876                 .put      = osd_otable_it_put,
1877                 .next     = osd_otable_it_next,
1878                 .key      = osd_otable_it_key,
1879                 .key_size = osd_otable_it_key_size,
1880                 .rec      = osd_otable_it_rec,
1881                 .store    = osd_otable_it_store,
1882                 .load     = osd_otable_it_load,
1883                 .key_rec  = osd_otable_it_key_rec,
1884         }
1885 };
1886
1887 /* high priority inconsistent items list APIs */
1888
1889 int osd_oii_insert(const struct lu_env *env, struct osd_device *dev,
1890                    const struct lu_fid *fid, uint64_t oid, bool insert)
1891 {
1892         struct lustre_scrub *scrub = &dev->od_scrub;
1893         struct osd_inconsistent_item *oii;
1894         bool wakeup = false;
1895         ENTRY;
1896
1897         osd_idc_find_and_init_with_oid(env, dev, fid, oid);
1898         OBD_ALLOC_PTR(oii);
1899         if (unlikely(!oii))
1900                 RETURN(-ENOMEM);
1901
1902         INIT_LIST_HEAD(&oii->oii_list);
1903         oii->oii_cache.oic_dev = dev;
1904         oii->oii_cache.oic_fid = *fid;
1905         oii->oii_cache.oic_dnode = oid;
1906         oii->oii_insert = insert;
1907
1908         spin_lock(&scrub->os_lock);
1909         if (!scrub->os_running) {
1910                 spin_unlock(&scrub->os_lock);
1911                 OBD_FREE_PTR(oii);
1912                 RETURN(-EAGAIN);
1913         }
1914
1915         if (list_empty(&scrub->os_inconsistent_items))
1916                 wakeup = true;
1917         list_add_tail(&oii->oii_list, &scrub->os_inconsistent_items);
1918         spin_unlock(&scrub->os_lock);
1919
1920         if (wakeup)
1921                 wake_up_var(scrub);
1922
1923         RETURN(0);
1924 }
1925
1926 int osd_oii_lookup(struct osd_device *dev, const struct lu_fid *fid,
1927                    uint64_t *oid)
1928 {
1929         struct lustre_scrub *scrub = &dev->od_scrub;
1930         struct osd_inconsistent_item *oii;
1931         int ret = -ENOENT;
1932         ENTRY;
1933
1934         spin_lock(&scrub->os_lock);
1935         list_for_each_entry(oii, &scrub->os_inconsistent_items, oii_list) {
1936                 if (lu_fid_eq(fid, &oii->oii_cache.oic_fid)) {
1937                         *oid = oii->oii_cache.oic_dnode;
1938                         ret = 0;
1939                         break;
1940                 }
1941         }
1942         spin_unlock(&scrub->os_lock);
1943
1944         RETURN(ret);
1945 }
1946
1947 typedef int (*scan_dir_helper_t)(const struct lu_env *env,
1948                                  struct osd_device *dev, uint64_t dir_oid,
1949                                  struct osd_zap_it *ozi);
1950
1951 static int osd_scan_dir(const struct lu_env *env, struct osd_device *dev,
1952                         uint64_t id, scan_dir_helper_t cb)
1953 {
1954         struct osd_zap_it *it;
1955         struct luz_direntry *zde;
1956         zap_attribute_t *za;
1957         int rc;
1958
1959         ENTRY;
1960
1961         OBD_SLAB_ALLOC_PTR_GFP(it, osd_zapit_cachep, GFP_NOFS);
1962         if (it == NULL)
1963                 RETURN(-ENOMEM);
1964
1965         rc = osd_zap_cursor_init(&it->ozi_zc, dev->od_os, id, 0);
1966         if (rc != 0)
1967                 GOTO(out, rc);
1968
1969         za = &it->ozi_za;
1970         zde = &it->ozi_zde;
1971         while (1) {
1972                 rc = -zap_cursor_retrieve(it->ozi_zc, za);
1973                 if (unlikely(rc)) {
1974                         if (rc == -ENOENT)
1975                                 rc = 0;
1976
1977                         break;
1978                 }
1979
1980                 if (name_is_dot_or_dotdot(za->za_name, strlen(za->za_name))) {
1981                         zap_cursor_advance(it->ozi_zc);
1982                         continue;
1983                 }
1984
1985                 strncpy(it->ozi_name, za->za_name, sizeof(it->ozi_name));
1986                 if (za->za_integer_length != 8) {
1987                         rc = -EIO;
1988                         break;
1989                 }
1990
1991                 rc = osd_zap_lookup(dev, it->ozi_zc->zc_zapobj, NULL,
1992                                     za->za_name, za->za_integer_length,
1993                                     sizeof(*zde) / za->za_integer_length, zde);
1994                 if (rc)
1995                         break;
1996
1997                 rc = cb(env, dev, id, it);
1998                 if (rc)
1999                         break;
2000
2001                 zap_cursor_advance(it->ozi_zc);
2002         }
2003         osd_zap_cursor_fini(it->ozi_zc);
2004
2005 out:
2006         OBD_SLAB_FREE_PTR(it, osd_zapit_cachep);
2007         RETURN(rc);
2008 }
2009
2010 static int osd_remove_ml_file(const struct lu_env *env, struct osd_device *dev,
2011                               uint64_t dir, uint64_t id, struct lu_fid *fid,
2012                               char *name)
2013 {
2014         struct osd_thread_info *info = osd_oti_get(env);
2015         struct dt_object *dt;
2016         struct osd_object *obj = NULL;
2017         dmu_tx_t *tx;
2018         sa_handle_t *hdl;
2019         uint64_t nlink;
2020         int rc;
2021
2022         rc = -sa_handle_get(dev->od_os, id, NULL, SA_HDL_PRIVATE, &hdl);
2023         if (rc)
2024                 RETURN(rc);
2025
2026         dt = lu2dt(lu_object_find_slice(env, osd2lu_dev(dev), fid, NULL));
2027         if (IS_ERR(dt))
2028                 RETURN(PTR_ERR(dt));
2029
2030         if (dt) {
2031                 obj = osd_dt_obj(dt);
2032                 down_read(&obj->oo_guard);
2033         }
2034
2035         rc = -sa_lookup(hdl, SA_ZPL_LINKS(dev), &nlink, sizeof(nlink));
2036         if (rc)
2037                 GOTO(out, rc);
2038
2039         if (nlink <= 1) {
2040                 CERROR("%s: multi-link file O/%s/%s/%s has nlink %llu\n",
2041                        osd_name(dev), info->oti_seq_name, info->oti_dir_name,
2042                        name, nlink);
2043                 GOTO(out, rc = 0);
2044         }
2045
2046         tx = dmu_tx_create(dev->od_os);
2047         if (!tx) {
2048                 CERROR("%s: fail to create tx to remove multi-link file!\n",
2049                        osd_name(dev));
2050                 GOTO(out, rc = -ENOMEM);
2051         }
2052
2053         dmu_tx_hold_zap(tx, dir, FALSE, NULL);
2054         rc = -dmu_tx_assign(tx, TXG_WAIT);
2055         if (rc)
2056                 GOTO(abort, rc);
2057
2058         nlink--;
2059         rc = -sa_update(hdl, SA_ZPL_LINKS(dev), &nlink, sizeof(nlink), tx);
2060         if (rc)
2061                 GOTO(abort, rc);
2062
2063         rc = -zap_remove(dev->od_os, dir, name, tx);
2064         if (rc)
2065                 GOTO(abort, rc);
2066
2067         dmu_tx_commit(tx);
2068         GOTO(out, rc);
2069
2070 abort:
2071         dmu_tx_abort(tx);
2072
2073 out:
2074         if (dt) {
2075                 up_read(&obj->oo_guard);
2076                 dt_object_put_nocache(env, dt);
2077         }
2078
2079         sa_handle_destroy(hdl);
2080         RETURN(rc);
2081 }
2082
2083 static int osd_scan_ml_file(const struct lu_env *env, struct osd_device *dev,
2084                             uint64_t dir_oid, struct osd_zap_it *ozi)
2085 {
2086         struct osd_thread_info *info = osd_oti_get(env);
2087         struct lu_fid *fid = &info->oti_fid;
2088         struct ost_id *ostid = &info->oti_ostid;
2089         char name[32];
2090         u64 seq;
2091         int rc = 0;
2092
2093         ENTRY;
2094
2095         rc = osd_get_fid_by_oid(env, dev, ozi->ozi_zde.lzd_reg.zde_dnode, fid);
2096         if (rc)
2097                 RETURN(rc);
2098
2099         seq = fid_seq(fid);
2100         fid_to_ostid(fid, ostid);
2101
2102         snprintf(name, sizeof(name), (fid_seq_is_rsvd(seq) ||
2103                                       fid_seq_is_mdt0(seq)) ? "%llu" : "%llx",
2104                                       fid_seq_is_idif(seq) ? 0 : seq);
2105         if (strcmp(info->oti_seq_name, name) != 0)
2106                 GOTO(fix, rc);
2107
2108         snprintf(name, sizeof(name), "d%d",
2109                 (int)ostid_id(ostid) % OSD_OST_MAP_SIZE);
2110         if (strcmp(info->oti_dir_name, name) != 0)
2111                 GOTO(fix, rc);
2112
2113         snprintf(name, sizeof(name), "%llu", ostid_id(ostid));
2114         if (strcmp(ozi->ozi_name, name) == 0)
2115                 RETURN(0);
2116
2117 fix:
2118         CDEBUG(D_LFSCK, "%s: the file O/%s/%s/%s is corrupted\n",
2119                osd_name(dev), info->oti_seq_name, info->oti_dir_name,
2120                ozi->ozi_name);
2121
2122         rc = osd_remove_ml_file(env, dev, dir_oid,
2123                                 ozi->ozi_zde.lzd_reg.zde_dnode, fid,
2124                                 ozi->ozi_name);
2125         RETURN(rc);
2126 }
2127
2128 static int osd_scan_ml_file_dir(const struct lu_env *env,
2129                                 struct osd_device *dev, uint64_t dir_oid,
2130                                 struct osd_zap_it *ozi)
2131 {
2132         struct osd_thread_info *info = osd_oti_get(env);
2133
2134         if (!S_ISDIR(cpu_to_le16(DTTOIF(ozi->ozi_zde.lzd_reg.zde_type))))
2135                 return 0;
2136
2137         info->oti_dir_name = ozi->ozi_name;
2138         return osd_scan_dir(env, dev, ozi->ozi_zde.lzd_reg.zde_dnode,
2139                             osd_scan_ml_file);
2140 }
2141
2142 static int osd_scan_ml_file_seq(const struct lu_env *env,
2143                                 struct osd_device *dev, uint64_t dir_oid,
2144                                 struct osd_zap_it *ozi)
2145 {
2146         struct osd_thread_info *info = osd_oti_get(env);
2147
2148         if (!S_ISDIR(cpu_to_le16(DTTOIF(ozi->ozi_zde.lzd_reg.zde_type))))
2149                 return 0;
2150
2151         info->oti_seq_name = ozi->ozi_name;
2152         return osd_scan_dir(env, dev, ozi->ozi_zde.lzd_reg.zde_dnode,
2153                             osd_scan_ml_file_dir);
2154 }
2155
2156 static int osd_scan_ml_file_main(const struct lu_env *env,
2157                                  struct osd_device *dev)
2158 {
2159         return osd_scan_dir(env, dev, dev->od_O_id, osd_scan_ml_file_seq);
2160 }