Whamcloud - gitweb
LU-7585 zfs: OI scrub for ZFS
[fs/lustre-release.git] / lustre / osd-zfs / osd_scrub.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2017, Intel Corporation.
24  */
25 /*
26  * lustre/osd-zfs/osd_scrub.c
27  *
28  * Top-level entry points into osd module
29  *
30  * The OI scrub is used for rebuilding Object Index files when restores MDT from
31  * file-level backup.
32  *
33  * The otable based iterator scans ZFS objects to feed up layer LFSCK.
34  *
35  * Author: Fan Yong <fan.yong@intel.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_LFSCK
39
40 #include <linux/kthread.h>
41 #include <uapi/linux/lustre/lustre_idl.h>
42 #include <lustre_disk.h>
43 #include <dt_object.h>
44 #include <linux/xattr.h>
45 #include <lustre_scrub.h>
46 #include <obd_class.h>
47 #include <lustre_nodemap.h>
48 #include <sys/dsl_dataset.h>
49
50 #include "osd_internal.h"
51
52 #define OSD_OTABLE_MAX_HASH             ((1ULL << 48) - 1)
53 #define OTABLE_PREFETCH                 256
54
55 #define DTO_INDEX_INSERT                1
56 #define DTO_INDEX_DELETE                2
57 #define DTO_INDEX_UPDATE                3
58
59 static inline bool osd_scrub_has_window(struct osd_otable_it *it)
60 {
61         return it->ooi_prefetched < OTABLE_PREFETCH;
62 }
63
64 /**
65  * update/insert/delete the specified OI mapping (@fid @id) according to the ops
66  *
67  * \retval   1, changed nothing
68  * \retval   0, changed successfully
69  * \retval -ve, on error
70  */
71 static int osd_scrub_refresh_mapping(const struct lu_env *env,
72                                      struct osd_device *dev,
73                                      const struct lu_fid *fid,
74                                      uint64_t oid, int ops,
75                                      bool force, const char *name)
76 {
77         struct osd_thread_info *info = osd_oti_get(env);
78         struct zpl_direntry *zde = &info->oti_zde.lzd_reg;
79         char *buf = info->oti_str;
80         dmu_tx_t *tx = NULL;
81         dnode_t *dn = NULL;
82         uint64_t zapid;
83         int rc;
84         ENTRY;
85
86         if (dev->od_scrub.os_file.sf_param & SP_DRYRUN && !force)
87                 GOTO(log, rc = 0);
88
89         tx = dmu_tx_create(dev->od_os);
90         if (!tx)
91                 GOTO(log, rc = -ENOMEM);
92
93         zapid = osd_get_name_n_idx(env, dev, fid, buf,
94                                    sizeof(info->oti_str), &dn);
95         osd_tx_hold_zap(tx, zapid, dn,
96                         ops == DTO_INDEX_INSERT ? TRUE : FALSE, NULL);
97         rc = -dmu_tx_assign(tx, TXG_WAIT);
98         if (rc) {
99                 dmu_tx_abort(tx);
100                 GOTO(log, rc);
101         }
102
103         switch (ops) {
104         case DTO_INDEX_UPDATE:
105                 zde->zde_pad = 0;
106                 zde->zde_dnode = oid;
107                 zde->zde_type = 0; /* The type in OI mapping is useless. */
108                 rc = -zap_update(dev->od_os, zapid, buf, 8, sizeof(*zde) / 8,
109                                  zde, tx);
110                 if (unlikely(rc == -ENOENT)) {
111                         /* Some unlink thread may removed the OI mapping. */
112                         rc = 1;
113                 }
114                 break;
115         case DTO_INDEX_INSERT:
116                 zde->zde_pad = 0;
117                 zde->zde_dnode = oid;
118                 zde->zde_type = 0; /* The type in OI mapping is useless. */
119                 rc = osd_zap_add(dev, zapid, dn, buf, 8, sizeof(*zde) / 8,
120                                  zde, tx);
121                 if (unlikely(rc == -EEXIST))
122                         rc = 1;
123                 break;
124         case DTO_INDEX_DELETE:
125                 rc = osd_zap_remove(dev, zapid, dn, buf, tx);
126                 if (rc == -ENOENT) {
127                         /* It is normal that the unlink thread has removed the
128                          * OI mapping already. */
129                         rc = 1;
130                 }
131                 break;
132         default:
133                 LASSERTF(0, "Unexpected ops %d\n", ops);
134                 rc = -EINVAL;
135                 break;
136         }
137
138         dmu_tx_commit(tx);
139         GOTO(log, rc);
140
141 log:
142         CDEBUG(D_LFSCK, "%s: refresh OI map for scrub, op %d, force %s, "
143                DFID" => %llu (%s): rc = %d\n", osd_name(dev), ops,
144                force ? "yes" : "no", PFID(fid), oid, name ? name : "null", rc);
145
146         return rc;
147 }
148
149 static int
150 osd_scrub_check_update(const struct lu_env *env, struct osd_device *dev,
151                        const struct lu_fid *fid, uint64_t oid, int val)
152 {
153         struct lustre_scrub *scrub = &dev->od_scrub;
154         struct scrub_file *sf = &scrub->os_file;
155         struct osd_inconsistent_item *oii = NULL;
156         nvlist_t *nvbuf = NULL;
157         dnode_t *dn = NULL;
158         uint64_t oid2;
159         int ops = DTO_INDEX_UPDATE;
160         int rc;
161         ENTRY;
162
163         down_write(&scrub->os_rwsem);
164         scrub->os_new_checked++;
165         if (val < 0)
166                 GOTO(out, rc = val);
167
168         if (scrub->os_in_prior)
169                 oii = list_entry(scrub->os_inconsistent_items.next,
170                                  struct osd_inconsistent_item, oii_list);
171
172         if (oid < sf->sf_pos_latest_start && !oii)
173                 GOTO(out, rc = 0);
174
175         if (oii && oii->oii_insert) {
176                 ops = DTO_INDEX_INSERT;
177                 goto zget;
178         }
179
180         rc = osd_fid_lookup(env, dev, fid, &oid2);
181         if (rc) {
182                 if (rc != -ENOENT)
183                         GOTO(out, rc);
184
185                 ops = DTO_INDEX_INSERT;
186
187 zget:
188                 rc = __osd_obj2dnode(dev->od_os, oid, &dn);
189                 if (rc) {
190                         /* Someone removed the object by race. */
191                         if (rc == -ENOENT || rc == -EEXIST)
192                                 rc = 0;
193                         GOTO(out, rc);
194                 }
195
196                 scrub->os_full_speed = 1;
197                 sf->sf_flags |= SF_INCONSISTENT;
198         } else if (oid == oid2) {
199                 GOTO(out, rc = 0);
200         } else {
201                 struct lustre_mdt_attrs *lma = NULL;
202                 int size;
203
204                 rc = __osd_xattr_load_by_oid(dev, oid2, &nvbuf);
205                 if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
206                         goto update;
207                 if (rc)
208                         GOTO(out, rc);
209
210                 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
211                                                (uchar_t **)&lma, &size);
212                 if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
213                         goto update;
214                 if (rc)
215                         GOTO(out, rc);
216
217                 lustre_lma_swab(lma);
218                 if (unlikely(lu_fid_eq(&lma->lma_self_fid, fid))) {
219                         CDEBUG(D_LFSCK, "%s: the FID "DFID" is used by "
220                                "two objects: %llu and %llu (in OI)\n",
221                                osd_name(dev), PFID(fid), oid, oid2);
222
223                         GOTO(out, rc = -EEXIST);
224                 }
225
226 update:
227                 scrub->os_full_speed = 1;
228                 sf->sf_flags |= SF_INCONSISTENT;
229         }
230
231         rc = osd_scrub_refresh_mapping(env, dev, fid, oid, ops, false, NULL);
232         if (!rc) {
233                 if (scrub->os_in_prior)
234                         sf->sf_items_updated_prior++;
235                 else
236                         sf->sf_items_updated++;
237         }
238
239         GOTO(out, rc);
240
241 out:
242         if (nvbuf)
243                 nvlist_free(nvbuf);
244
245         if (rc < 0) {
246                 sf->sf_items_failed++;
247                 if (sf->sf_pos_first_inconsistent == 0 ||
248                     sf->sf_pos_first_inconsistent > oid)
249                         sf->sf_pos_first_inconsistent = oid;
250         } else {
251                 rc = 0;
252         }
253
254         /* There may be conflict unlink during the OI scrub,
255          * if happend, then remove the new added OI mapping. */
256         if (ops == DTO_INDEX_INSERT && dn && dn->dn_free_txg)
257                 osd_scrub_refresh_mapping(env, dev, fid, oid,
258                                           DTO_INDEX_DELETE, false, NULL);
259         up_write(&scrub->os_rwsem);
260
261         if (dn)
262                 osd_dnode_rele(dn);
263
264         if (oii) {
265                 spin_lock(&scrub->os_lock);
266                 if (likely(!list_empty(&oii->oii_list)))
267                         list_del(&oii->oii_list);
268                 spin_unlock(&scrub->os_lock);
269                 OBD_FREE_PTR(oii);
270         }
271
272         RETURN(sf->sf_param & SP_FAILOUT ? rc : 0);
273 }
274
275 static int osd_scrub_prep(const struct lu_env *env, struct osd_device *dev)
276 {
277         struct lustre_scrub *scrub = &dev->od_scrub;
278         struct ptlrpc_thread *thread = &scrub->os_thread;
279         struct scrub_file *sf = &scrub->os_file;
280         __u32 flags = scrub->os_start_flags;
281         int rc;
282         bool drop_dryrun = false;
283         ENTRY;
284
285         CDEBUG(D_LFSCK, "%s: OI scrub prep, flags = 0x%x\n",
286                scrub->os_name, flags);
287
288         down_write(&scrub->os_rwsem);
289         if (flags & SS_SET_FAILOUT)
290                 sf->sf_param |= SP_FAILOUT;
291         else if (flags & SS_CLEAR_FAILOUT)
292                 sf->sf_param &= ~SP_FAILOUT;
293
294         if (flags & SS_SET_DRYRUN) {
295                 sf->sf_param |= SP_DRYRUN;
296         } else if (flags & SS_CLEAR_DRYRUN && sf->sf_param & SP_DRYRUN) {
297                 sf->sf_param &= ~SP_DRYRUN;
298                 drop_dryrun = true;
299         }
300
301         if (flags & SS_RESET)
302                 scrub_file_reset(scrub, dev->od_uuid, 0);
303
304         scrub->os_partial_scan = 0;
305         if (flags & SS_AUTO_FULL) {
306                 scrub->os_full_speed = 1;
307                 sf->sf_flags |= SF_AUTO;
308         } else if (sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
309                                    SF_UPGRADE)) {
310                 scrub->os_full_speed = 1;
311         } else {
312                 scrub->os_full_speed = 0;
313         }
314
315         spin_lock(&scrub->os_lock);
316         scrub->os_in_prior = 0;
317         scrub->os_waiting = 0;
318         scrub->os_paused = 0;
319         scrub->os_in_join = 0;
320         scrub->os_full_scrub = 0;
321         spin_unlock(&scrub->os_lock);
322         scrub->os_new_checked = 0;
323         if (drop_dryrun && sf->sf_pos_first_inconsistent != 0)
324                 sf->sf_pos_latest_start = sf->sf_pos_first_inconsistent;
325         else if (sf->sf_pos_last_checkpoint != 0)
326                 sf->sf_pos_latest_start = sf->sf_pos_last_checkpoint + 1;
327         else
328                 sf->sf_pos_latest_start = 1;
329
330         scrub->os_pos_current = sf->sf_pos_latest_start;
331         sf->sf_status = SS_SCANNING;
332         sf->sf_time_latest_start = cfs_time_current_sec();
333         sf->sf_time_last_checkpoint = sf->sf_time_latest_start;
334         sf->sf_pos_last_checkpoint = sf->sf_pos_latest_start - 1;
335         rc = scrub_file_store(env, scrub);
336         if (!rc) {
337                 spin_lock(&scrub->os_lock);
338                 thread_set_flags(thread, SVC_RUNNING);
339                 spin_unlock(&scrub->os_lock);
340                 wake_up_all(&thread->t_ctl_waitq);
341         }
342         up_write(&scrub->os_rwsem);
343
344         RETURN(rc);
345 }
346
347 static int osd_scrub_post(const struct lu_env *env, struct osd_device *dev,
348                           int result)
349 {
350         struct lustre_scrub *scrub = &dev->od_scrub;
351         struct scrub_file *sf = &scrub->os_file;
352         int rc;
353         ENTRY;
354
355         CDEBUG(D_LFSCK, "%s: OI scrub post with result = %d\n",
356                scrub->os_name, result);
357
358         down_write(&scrub->os_rwsem);
359         spin_lock(&scrub->os_lock);
360         thread_set_flags(&scrub->os_thread, SVC_STOPPING);
361         spin_unlock(&scrub->os_lock);
362         if (scrub->os_new_checked > 0) {
363                 sf->sf_items_checked += scrub->os_new_checked;
364                 scrub->os_new_checked = 0;
365                 sf->sf_pos_last_checkpoint = scrub->os_pos_current;
366         }
367         sf->sf_time_last_checkpoint = cfs_time_current_sec();
368         if (result > 0) {
369                 sf->sf_status = SS_COMPLETED;
370                 if (!(sf->sf_param & SP_DRYRUN)) {
371                         memset(sf->sf_oi_bitmap, 0, SCRUB_OI_BITMAP_SIZE);
372                         sf->sf_flags &= ~(SF_RECREATED | SF_INCONSISTENT |
373                                           SF_UPGRADE | SF_AUTO);
374                 }
375                 sf->sf_time_last_complete = sf->sf_time_last_checkpoint;
376                 sf->sf_success_count++;
377         } else if (result == 0) {
378                 if (scrub->os_paused)
379                         sf->sf_status = SS_PAUSED;
380                 else
381                         sf->sf_status = SS_STOPPED;
382         } else {
383                 sf->sf_status = SS_FAILED;
384         }
385         sf->sf_run_time += cfs_duration_sec(cfs_time_current() + HALF_SEC -
386                                             scrub->os_time_last_checkpoint);
387         rc = scrub_file_store(env, scrub);
388         up_write(&scrub->os_rwsem);
389
390         RETURN(rc < 0 ? rc : result);
391 }
392
393 /* iteration engine */
394
395 static inline int
396 osd_scrub_wakeup(struct lustre_scrub *scrub, struct osd_otable_it *it)
397 {
398         spin_lock(&scrub->os_lock);
399         if (osd_scrub_has_window(it) ||
400             !list_empty(&scrub->os_inconsistent_items) ||
401             it->ooi_waiting || !thread_is_running(&scrub->os_thread))
402                 scrub->os_waiting = 0;
403         else
404                 scrub->os_waiting = 1;
405         spin_unlock(&scrub->os_lock);
406
407         return !scrub->os_waiting;
408 }
409
410 static int osd_scrub_next(const struct lu_env *env, struct osd_device *dev,
411                           struct lu_fid *fid, uint64_t *oid)
412 {
413         struct l_wait_info lwi = { 0 };
414         struct lustre_scrub *scrub = &dev->od_scrub;
415         struct ptlrpc_thread *thread = &scrub->os_thread;
416         struct osd_otable_it *it = dev->od_otable_it;
417         struct lustre_mdt_attrs *lma = NULL;
418         nvlist_t *nvbuf = NULL;
419         int size = 0;
420         int rc = 0;
421         ENTRY;
422
423         if (OBD_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_DELAY) && cfs_fail_val > 0) {
424                 lwi = LWI_TIMEOUT(cfs_time_seconds(cfs_fail_val), NULL, NULL);
425                 if (likely(lwi.lwi_timeout > 0)) {
426                         l_wait_event(thread->t_ctl_waitq,
427                                 !list_empty(&scrub->os_inconsistent_items) ||
428                                 !thread_is_running(thread),
429                                 &lwi);
430                         if (unlikely(!thread_is_running(thread)))
431                                 RETURN(SCRUB_NEXT_EXIT);
432                 }
433         }
434
435         if (OBD_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_CRASH)) {
436                 spin_lock(&scrub->os_lock);
437                 thread_set_flags(thread, SVC_STOPPING);
438                 spin_unlock(&scrub->os_lock);
439                 RETURN(SCRUB_NEXT_CRASH);
440         }
441
442         if (OBD_FAIL_CHECK(OBD_FAIL_OSD_SCRUB_FATAL))
443                 RETURN(SCRUB_NEXT_FATAL);
444
445 again:
446         if (nvbuf) {
447                 nvlist_free(nvbuf);
448                 nvbuf = NULL;
449                 lma = NULL;
450         }
451
452         if (!list_empty(&scrub->os_inconsistent_items)) {
453                 spin_lock(&scrub->os_lock);
454                 if (likely(!list_empty(&scrub->os_inconsistent_items))) {
455                         struct osd_inconsistent_item *oii;
456
457                         oii = list_entry(scrub->os_inconsistent_items.next,
458                                 struct osd_inconsistent_item, oii_list);
459                         *fid = oii->oii_cache.oic_fid;
460                         *oid = oii->oii_cache.oic_dnode;
461                         scrub->os_in_prior = 1;
462                         spin_unlock(&scrub->os_lock);
463
464                         GOTO(out, rc = 0);
465                 }
466                 spin_unlock(&scrub->os_lock);
467         }
468
469         if (!scrub->os_full_speed && !osd_scrub_has_window(it)) {
470                 memset(&lwi, 0, sizeof(lwi));
471                 l_wait_event(thread->t_ctl_waitq,
472                              osd_scrub_wakeup(scrub, it),
473                              &lwi);
474         }
475
476         if (unlikely(!thread_is_running(thread)))
477                 GOTO(out, rc = SCRUB_NEXT_EXIT);
478
479         rc = -dmu_object_next(dev->od_os, &scrub->os_pos_current, B_FALSE, 0);
480         if (rc)
481                 GOTO(out, rc = (rc == -ESRCH ? SCRUB_NEXT_BREAK : rc));
482
483         rc = __osd_xattr_load_by_oid(dev, scrub->os_pos_current, &nvbuf);
484         if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
485                 goto again;
486
487         if (rc)
488                 GOTO(out, rc);
489
490         LASSERT(nvbuf != NULL);
491         rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
492                                        (uchar_t **)&lma, &size);
493         if (!rc) {
494                 lustre_lma_swab(lma);
495                 if (likely(!(lma->lma_compat & LMAC_NOT_IN_OI) &&
496                            !(lma->lma_incompat & LMAI_AGENT))) {
497                         *fid = lma->lma_self_fid;
498                         *oid = scrub->os_pos_current;
499
500                         GOTO(out, rc = 0);
501                 }
502         }
503
504         if (!scrub->os_full_speed) {
505                 spin_lock(&scrub->os_lock);
506                 it->ooi_prefetched++;
507                 if (it->ooi_waiting) {
508                         it->ooi_waiting = 0;
509                         wake_up_all(&thread->t_ctl_waitq);
510                 }
511                 spin_unlock(&scrub->os_lock);
512         }
513
514         goto again;
515
516 out:
517         if (nvbuf)
518                 nvlist_free(nvbuf);
519
520         return rc;
521 }
522
523 static int osd_scrub_exec(const struct lu_env *env, struct osd_device *dev,
524                           const struct lu_fid *fid, uint64_t oid, int rc)
525 {
526         struct lustre_scrub *scrub = &dev->od_scrub;
527         struct ptlrpc_thread *thread = &scrub->os_thread;
528         struct osd_otable_it *it = dev->od_otable_it;
529
530         rc = osd_scrub_check_update(env, dev, fid, oid, rc);
531         if (!scrub->os_in_prior) {
532                 if (!scrub->os_full_speed) {
533                         spin_lock(&scrub->os_lock);
534                         it->ooi_prefetched++;
535                         if (it->ooi_waiting) {
536                                 it->ooi_waiting = 0;
537                                 wake_up_all(&thread->t_ctl_waitq);
538                         }
539                         spin_unlock(&scrub->os_lock);
540                 }
541         } else {
542                 scrub->os_in_prior = 0;
543         }
544
545         if (rc)
546                 return rc;
547
548         rc = scrub_checkpoint(env, scrub);
549         if (rc) {
550                 CDEBUG(D_LFSCK, "%s: fail to checkpoint, pos = %llu: "
551                        "rc = %d\n", scrub->os_name, scrub->os_pos_current, rc);
552                 /* Continue, as long as the scrub itself can go ahead. */
553         }
554
555         return 0;
556 }
557
558 static int osd_scrub_main(void *args)
559 {
560         struct lu_env env;
561         struct osd_device *dev = (struct osd_device *)args;
562         struct lustre_scrub *scrub = &dev->od_scrub;
563         struct ptlrpc_thread *thread = &scrub->os_thread;
564         struct lu_fid *fid;
565         uint64_t oid;
566         int rc = 0;
567         ENTRY;
568
569         rc = lu_env_init(&env, LCT_LOCAL | LCT_DT_THREAD);
570         if (rc) {
571                 CDEBUG(D_LFSCK, "%s: OI scrub fail to init env: rc = %d\n",
572                        scrub->os_name, rc);
573                 GOTO(noenv, rc);
574         }
575
576         rc = osd_scrub_prep(&env, dev);
577         if (rc) {
578                 CDEBUG(D_LFSCK, "%s: OI scrub fail to scrub prep: rc = %d\n",
579                        scrub->os_name, rc);
580                 GOTO(out, rc);
581         }
582
583         if (!scrub->os_full_speed) {
584                 struct l_wait_info lwi = { 0 };
585                 struct osd_otable_it *it = dev->od_otable_it;
586
587                 l_wait_event(thread->t_ctl_waitq,
588                              it->ooi_user_ready || !thread_is_running(thread),
589                              &lwi);
590                 if (unlikely(!thread_is_running(thread)))
591                         GOTO(post, rc = 0);
592
593                 scrub->os_pos_current = it->ooi_pos;
594         }
595
596         CDEBUG(D_LFSCK, "%s: OI scrub start, flags = 0x%x, pos = %llu\n",
597                scrub->os_name, scrub->os_start_flags,
598                scrub->os_pos_current);
599
600         fid = &osd_oti_get(&env)->oti_fid;
601         while (!rc && thread_is_running(thread)) {
602                 rc = osd_scrub_next(&env, dev, fid, &oid);
603                 switch (rc) {
604                 case SCRUB_NEXT_EXIT:
605                         GOTO(post, rc = 0);
606                 case SCRUB_NEXT_CRASH:
607                         spin_lock(&scrub->os_lock);
608                         thread_set_flags(&scrub->os_thread, SVC_STOPPING);
609                         spin_unlock(&scrub->os_lock);
610                         GOTO(out, rc = -EINVAL);
611                 case SCRUB_NEXT_FATAL:
612                         GOTO(post, rc = -EINVAL);
613                 case SCRUB_NEXT_BREAK:
614                         GOTO(post, rc = 1);
615                 }
616
617                 rc = osd_scrub_exec(&env, dev, fid, oid, rc);
618         }
619
620         GOTO(post, rc);
621
622 post:
623         rc = osd_scrub_post(&env, dev, rc);
624         CDEBUG(D_LFSCK, "%s: OI scrub: stop, pos = %llu: rc = %d\n",
625                scrub->os_name, scrub->os_pos_current, rc);
626
627 out:
628         while (!list_empty(&scrub->os_inconsistent_items)) {
629                 struct osd_inconsistent_item *oii;
630
631                 oii = list_entry(scrub->os_inconsistent_items.next,
632                                  struct osd_inconsistent_item, oii_list);
633                 list_del_init(&oii->oii_list);
634                 OBD_FREE_PTR(oii);
635         }
636
637         lu_env_fini(&env);
638
639 noenv:
640         spin_lock(&scrub->os_lock);
641         thread_set_flags(thread, SVC_STOPPED);
642         wake_up_all(&thread->t_ctl_waitq);
643         spin_unlock(&scrub->os_lock);
644         return rc;
645 }
646
647 /* initial OI scrub */
648
649 struct osd_lf_map;
650
651 typedef int (*handle_dirent_t)(const struct lu_env *, struct osd_device *,
652                                const char *, uint64_t, uint64_t,
653                                enum osd_lf_flags, bool);
654 static int osd_ios_varfid_hd(const struct lu_env *, struct osd_device *,
655                              const char *, uint64_t, uint64_t,
656                              enum osd_lf_flags, bool);
657 static int osd_ios_uld_hd(const struct lu_env *, struct osd_device *,
658                           const char *, uint64_t, uint64_t,
659                           enum osd_lf_flags, bool);
660
661 typedef int (*scan_dir_t)(const struct lu_env *, struct osd_device *,
662                           uint64_t, handle_dirent_t, enum osd_lf_flags);
663 static int osd_ios_general_sd(const struct lu_env *, struct osd_device *,
664                               uint64_t, handle_dirent_t, enum osd_lf_flags);
665 static int osd_ios_ROOT_sd(const struct lu_env *, struct osd_device *,
666                            uint64_t, handle_dirent_t, enum osd_lf_flags);
667
668 struct osd_lf_map {
669         char                    *olm_name;
670         struct lu_fid            olm_fid;
671         enum osd_lf_flags        olm_flags;
672         scan_dir_t               olm_scan_dir;
673         handle_dirent_t          olm_handle_dirent;
674 };
675
676 /* Add the new introduced local files in the list in the future. */
677 static const struct osd_lf_map osd_lf_maps[] = {
678         /* CONFIGS */
679         {
680                 .olm_name               = MOUNT_CONFIGS_DIR,
681                 .olm_fid                = {
682                         .f_seq  = FID_SEQ_LOCAL_FILE,
683                         .f_oid  = MGS_CONFIGS_OID,
684                 },
685                 .olm_flags              = OLF_SCAN_SUBITEMS,
686                 .olm_scan_dir           = osd_ios_general_sd,
687                 .olm_handle_dirent      = osd_ios_varfid_hd,
688         },
689
690         /* NIDTBL_VERSIONS */
691         {
692                 .olm_name               = MGS_NIDTBL_DIR,
693                 .olm_flags              = OLF_SCAN_SUBITEMS,
694                 .olm_scan_dir           = osd_ios_general_sd,
695                 .olm_handle_dirent      = osd_ios_varfid_hd,
696         },
697
698         /* PENDING */
699         {
700                 .olm_name               = "PENDING",
701         },
702
703         /* ROOT */
704         {
705                 .olm_name               = "ROOT",
706                 .olm_fid                = {
707                         .f_seq  = FID_SEQ_ROOT,
708                         .f_oid  = FID_OID_ROOT,
709                 },
710                 .olm_flags              = OLF_SCAN_SUBITEMS,
711                 .olm_scan_dir           = osd_ios_ROOT_sd,
712         },
713
714         /* fld */
715         {
716                 .olm_name               = "fld",
717                 .olm_fid                = {
718                         .f_seq  = FID_SEQ_LOCAL_FILE,
719                         .f_oid  = FLD_INDEX_OID,
720                 },
721         },
722
723         /* changelog_catalog */
724         {
725                 .olm_name               = CHANGELOG_CATALOG,
726         },
727
728         /* changelog_users */
729         {
730                 .olm_name               = CHANGELOG_USERS,
731         },
732
733         /* quota_master */
734         {
735                 .olm_name               = QMT_DIR,
736                 .olm_flags              = OLF_SCAN_SUBITEMS,
737                 .olm_scan_dir           = osd_ios_general_sd,
738                 .olm_handle_dirent      = osd_ios_varfid_hd,
739         },
740
741         /* quota_slave */
742         {
743                 .olm_name               = QSD_DIR,
744                 .olm_flags              = OLF_SCAN_SUBITEMS,
745                 .olm_scan_dir           = osd_ios_general_sd,
746                 .olm_handle_dirent      = osd_ios_varfid_hd,
747         },
748
749         /* LFSCK */
750         {
751                 .olm_name               = LFSCK_DIR,
752                 .olm_flags              = OLF_SCAN_SUBITEMS,
753                 .olm_scan_dir           = osd_ios_general_sd,
754                 .olm_handle_dirent      = osd_ios_varfid_hd,
755         },
756
757         /* lfsck_bookmark */
758         {
759                 .olm_name               = LFSCK_BOOKMARK,
760         },
761
762         /* lfsck_layout */
763         {
764                 .olm_name               = LFSCK_LAYOUT,
765         },
766
767         /* lfsck_namespace */
768         {
769                 .olm_name               = LFSCK_NAMESPACE,
770         },
771
772         /* OSP update logs update_log{_dir} use f_seq = FID_SEQ_UPDATE_LOG{_DIR}
773          * and f_oid = index for their log files.  See lu_update_log{_dir}_fid()
774          * for more details. */
775
776         /* update_log */
777         {
778                 .olm_name               = "update_log",
779                 .olm_fid                = {
780                         .f_seq  = FID_SEQ_UPDATE_LOG,
781                 },
782                 .olm_flags              = OLF_IDX_IN_FID,
783         },
784
785         /* update_log_dir */
786         {
787                 .olm_name               = "update_log_dir",
788                 .olm_fid        = {
789                         .f_seq  = FID_SEQ_UPDATE_LOG_DIR,
790                 },
791                 .olm_flags              = OLF_SCAN_SUBITEMS | OLF_IDX_IN_FID,
792                 .olm_scan_dir           = osd_ios_general_sd,
793                 .olm_handle_dirent      = osd_ios_uld_hd,
794         },
795
796         /* hsm_actions */
797         {
798                 .olm_name               = HSM_ACTIONS,
799         },
800
801         /* nodemap */
802         {
803                 .olm_name               = LUSTRE_NODEMAP_NAME,
804         },
805
806         {
807                 .olm_name               = NULL
808         }
809 };
810
811 /* Add the new introduced files under .lustre/ in the list in the future. */
812 static const struct osd_lf_map osd_dl_maps[] = {
813         /* .lustre/fid */
814         {
815                 .olm_name               = "fid",
816                 .olm_fid                = {
817                         .f_seq  = FID_SEQ_DOT_LUSTRE,
818                         .f_oid  = FID_OID_DOT_LUSTRE_OBF,
819                 },
820         },
821
822         /* .lustre/lost+found */
823         {
824                 .olm_name               = "lost+found",
825                 .olm_fid                = {
826                         .f_seq  = FID_SEQ_DOT_LUSTRE,
827                         .f_oid  = FID_OID_DOT_LUSTRE_LPF,
828                 },
829         },
830
831         {
832                 .olm_name               = NULL
833         }
834 };
835
836 struct osd_ios_item {
837         struct list_head        oii_list;
838         uint64_t                oii_parent;
839         enum osd_lf_flags       oii_flags;
840         scan_dir_t              oii_scan_dir;
841         handle_dirent_t         oii_handle_dirent;
842 };
843
844 static int osd_ios_new_item(struct osd_device *dev, uint64_t parent,
845                             enum osd_lf_flags flags, scan_dir_t scan_dir,
846                             handle_dirent_t handle_dirent)
847 {
848         struct osd_ios_item *item;
849
850         OBD_ALLOC_PTR(item);
851         if (!item) {
852                 CWARN("%s: initial OI scrub failed to add item for %llu\n",
853                       osd_name(dev), parent);
854                 return -ENOMEM;
855         }
856
857         INIT_LIST_HEAD(&item->oii_list);
858         item->oii_parent = parent;
859         item->oii_flags = flags;
860         item->oii_scan_dir = scan_dir;
861         item->oii_handle_dirent = handle_dirent;
862         list_add_tail(&item->oii_list, &dev->od_ios_list);
863
864         return 0;
865 }
866
867 /**
868  * verify FID-in-LMA and OI entry for one object
869  *
870  * ios: Initial OI Scrub.
871  */
872 static int osd_ios_scan_one(const struct lu_env *env, struct osd_device *dev,
873                             const struct lu_fid *fid, uint64_t parent,
874                             uint64_t oid, const char *name,
875                             enum osd_lf_flags flags)
876 {
877         struct lustre_scrub *scrub = &dev->od_scrub;
878         struct scrub_file *sf = &scrub->os_file;
879         struct lustre_mdt_attrs *lma = NULL;
880         nvlist_t *nvbuf = NULL;
881         struct lu_fid tfid;
882         uint64_t oid2 = 0;
883         __u64 flag = 0;
884         int size = 0;
885         int op = 0;
886         int rc;
887         ENTRY;
888
889         rc = __osd_xattr_load_by_oid(dev, oid, &nvbuf);
890         if (unlikely(rc == -ENOENT || rc == -EEXIST))
891                 RETURN(0);
892
893         if (rc && rc != -ENODATA) {
894                 CWARN("%s: initial OI scrub failed to get lma for %llu: "
895                       "rc = %d\n", osd_name(dev), oid, rc);
896
897                 RETURN(rc);
898         }
899
900         if (!rc) {
901                 LASSERT(nvbuf != NULL);
902                 rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
903                                                (uchar_t **)&lma, &size);
904                 if (rc || size == 0) {
905                         LASSERT(lma == NULL);
906                         rc = -ENODATA;
907                 } else {
908                         LASSERTF(lma != NULL, "corrupted LMA, size %d\n", size);
909                         lustre_lma_swab(lma);
910                         if (lma->lma_compat & LMAC_NOT_IN_OI) {
911                                 nvlist_free(nvbuf);
912                                 RETURN(0);
913                         }
914
915                         tfid = lma->lma_self_fid;
916                 }
917                 nvlist_free(nvbuf);
918         }
919
920         if (rc == -ENODATA) {
921                 if (!fid) {
922                         /* Skip the object without FID-in-LMA */
923                         CDEBUG(D_LFSCK, "%s: %llu has no FID-in-LMA, skip it\n",
924                                osd_name(dev), oid);
925
926                         RETURN(0);
927                 }
928
929                 LASSERT(!fid_is_zero(fid));
930
931                 tfid = *fid;
932                 if (flags & OLF_IDX_IN_FID) {
933                         LASSERT(dev->od_index >= 0);
934
935                         tfid.f_oid = dev->od_index;
936                 }
937         }
938
939         rc = osd_fid_lookup(env, dev, &tfid, &oid2);
940         if (rc) {
941                 if (rc != -ENOENT) {
942                         CWARN("%s: initial OI scrub failed to lookup fid for "
943                               DFID"=>%llu: rc = %d\n",
944                               osd_name(dev), PFID(&tfid), oid, rc);
945
946                         RETURN(rc);
947                 }
948
949                 flag = SF_RECREATED;
950                 op = DTO_INDEX_INSERT;
951         } else {
952                 if (oid == oid2)
953                         RETURN(0);
954
955                 flag = SF_INCONSISTENT;
956                 op = DTO_INDEX_UPDATE;
957         }
958
959         if (!(sf->sf_flags & flag)) {
960                 scrub_file_reset(scrub, dev->od_uuid, flag);
961                 rc = scrub_file_store(env, scrub);
962                 if (rc)
963                         RETURN(rc);
964         }
965
966         rc = osd_scrub_refresh_mapping(env, dev, &tfid, oid, op, true, name);
967
968         RETURN(rc > 0 ? 0 : rc);
969 }
970
971 static int osd_ios_varfid_hd(const struct lu_env *env, struct osd_device *dev,
972                              const char *name, uint64_t parent, uint64_t oid,
973                              enum osd_lf_flags flags, bool is_dir)
974 {
975         int rc;
976         ENTRY;
977
978         rc = osd_ios_scan_one(env, dev, NULL, parent, oid, name, 0);
979         if (!rc && is_dir)
980                 rc = osd_ios_new_item(dev, oid, flags, osd_ios_general_sd,
981                                       osd_ios_varfid_hd);
982
983         RETURN(rc);
984 }
985
986 static int osd_ios_uld_hd(const struct lu_env *env, struct osd_device *dev,
987                           const char *name, uint64_t parent, uint64_t oid,
988                           enum osd_lf_flags flags, bool is_dir)
989 {
990         struct lu_fid tfid;
991         int rc;
992         ENTRY;
993
994         /* skip any non-DFID format name */
995         if (name[0] != '[')
996                 RETURN(0);
997
998         /* skip the start '[' */
999         sscanf(&name[1], SFID, RFID(&tfid));
1000         if (fid_is_sane(&tfid))
1001                 rc = osd_ios_scan_one(env, dev, &tfid, parent, oid, name, 0);
1002         else
1003                 rc = -EIO;
1004
1005         RETURN(rc);
1006 }
1007
1008 /*
1009  * General scanner for the directories execpt /ROOT during initial OI scrub.
1010  * It scans the name entries under the given directory one by one. For each
1011  * entry, verifies its OI mapping via the given @handle_dirent.
1012  */
1013 static int osd_ios_general_sd(const struct lu_env *env, struct osd_device *dev,
1014                               uint64_t parent, handle_dirent_t handle_dirent,
1015                               enum osd_lf_flags flags)
1016 {
1017         struct osd_thread_info *info = osd_oti_get(env);
1018         struct luz_direntry *zde = &info->oti_zde;
1019         zap_attribute_t *za = &info->oti_za;
1020         zap_cursor_t *zc = &info->oti_zc;
1021         int rc;
1022         ENTRY;
1023
1024         zap_cursor_init_serialized(zc, dev->od_os, parent, 0);
1025         rc = -zap_cursor_retrieve(zc, za);
1026         if (rc == -ENOENT)
1027                 zap_cursor_advance(zc);
1028         else if (rc)
1029                 GOTO(log, rc);
1030
1031         while (1) {
1032                 rc = -zap_cursor_retrieve(zc, za);
1033                 if (rc)
1034                         GOTO(log, rc = (rc == -ENOENT ? 0 : rc));
1035
1036                 /* skip the entry started with '.' */
1037                 if (likely(za->za_name[0] != '.')) {
1038                         rc = osd_zap_lookup(dev, parent, NULL, za->za_name,
1039                                         za->za_integer_length,
1040                                         sizeof(*zde) / za->za_integer_length,
1041                                         (void *)zde);
1042                         if (rc) {
1043                                 CWARN("%s: initial OI scrub failed to lookup "
1044                                       "%s under %llu: rc = %d\n",
1045                                       osd_name(dev), za->za_name, parent, rc);
1046                                 continue;
1047                         }
1048
1049                         rc = handle_dirent(env, dev, za->za_name, parent,
1050                                         zde->lzd_reg.zde_dnode, flags,
1051                                         S_ISDIR(DTTOIF(zde->lzd_reg.zde_type)) ?
1052                                         true : false);
1053                         CDEBUG(D_LFSCK, "%s: initial OI scrub handled %s under "
1054                                "%llu: rc = %d\n",
1055                                osd_name(dev), za->za_name, parent, rc);
1056                 }
1057
1058                 zap_cursor_advance(zc);
1059         }
1060
1061 log:
1062         if (rc)
1063                 CWARN("%s: initial OI scrub failed to scan the directory %llu: "
1064                       "rc = %d\n", osd_name(dev), parent, rc);
1065         zap_cursor_fini(zc);
1066
1067         return rc;
1068 }
1069
1070 /*
1071  * The scanner for /ROOT directory. It is not all the items under /ROOT will
1072  * be scanned during the initial OI scrub, instead, only the .lustre and the
1073  * sub-items under .lustre will be handled.
1074  */
1075 static int osd_ios_ROOT_sd(const struct lu_env *env, struct osd_device *dev,
1076                            uint64_t parent, handle_dirent_t handle_dirent,
1077                            enum osd_lf_flags flags)
1078 {
1079         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
1080         const struct osd_lf_map *map;
1081         uint64_t oid;
1082         int rc;
1083         int rc1 = 0;
1084         ENTRY;
1085
1086         rc = osd_zap_lookup(dev, parent, NULL, dot_lustre_name, 8,
1087                             sizeof(*zde) / 8, (void *)zde);
1088         if (rc == -ENOENT) {
1089                 /* The .lustre directory is lost. That is not fatal. It can
1090                  * be re-created in the subsequent MDT start processing. */
1091                 RETURN(0);
1092         }
1093
1094         if (rc) {
1095                 CWARN("%s: initial OI scrub failed to find .lustre: "
1096                       "rc = %d\n", osd_name(dev), rc);
1097
1098                 RETURN(rc);
1099         }
1100
1101         oid = zde->lzd_reg.zde_dnode;
1102         rc = osd_ios_scan_one(env, dev, &LU_DOT_LUSTRE_FID, parent, oid,
1103                               dot_lustre_name, 0);
1104         if (rc)
1105                 RETURN(rc);
1106
1107         for (map = osd_dl_maps; map->olm_name; map++) {
1108                 rc = osd_zap_lookup(dev, oid, NULL, map->olm_name, 8,
1109                                     sizeof(*zde) / 8, (void *)zde);
1110                 if (rc) {
1111                         if (rc != -ENOENT)
1112                                 CWARN("%s: initial OI scrub failed to find"
1113                                       "the entry %s under .lustre: rc = %d\n",
1114                                       osd_name(dev), map->olm_name, rc);
1115                         else if (!fid_is_zero(&map->olm_fid))
1116                                 /* Try to remove the stale OI mapping. */
1117                                 osd_scrub_refresh_mapping(env, dev,
1118                                                 &map->olm_fid, 0,
1119                                                 DTO_INDEX_DELETE, true,
1120                                                 map->olm_name);
1121                         continue;
1122                 }
1123
1124                 rc = osd_ios_scan_one(env, dev, &map->olm_fid, oid,
1125                                       zde->lzd_reg.zde_dnode, map->olm_name,
1126                                       map->olm_flags);
1127                 if (rc)
1128                         rc1 = rc;
1129         }
1130
1131         RETURN(rc1);
1132 }
1133
1134 static void osd_initial_OI_scrub(const struct lu_env *env,
1135                                  struct osd_device *dev)
1136 {
1137         struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
1138         const struct osd_lf_map *map;
1139         int rc;
1140         ENTRY;
1141
1142         for (map = osd_lf_maps; map->olm_name; map++) {
1143                 rc = osd_zap_lookup(dev, dev->od_root, NULL, map->olm_name, 8,
1144                                     sizeof(*zde) / 8, (void *)zde);
1145                 if (rc) {
1146                         if (rc != -ENOENT)
1147                                 CWARN("%s: initial OI scrub failed "
1148                                       "to find the entry %s: rc = %d\n",
1149                                       osd_name(dev), map->olm_name, rc);
1150                         else if (!fid_is_zero(&map->olm_fid))
1151                                 /* Try to remove the stale OI mapping. */
1152                                 osd_scrub_refresh_mapping(env, dev,
1153                                                 &map->olm_fid, 0,
1154                                                 DTO_INDEX_DELETE, true,
1155                                                 map->olm_name);
1156                         continue;
1157                 }
1158
1159                 rc = osd_ios_scan_one(env, dev, &map->olm_fid, dev->od_root,
1160                                       zde->lzd_reg.zde_dnode, map->olm_name,
1161                                       map->olm_flags);
1162                 if (!rc && map->olm_flags & OLF_SCAN_SUBITEMS)
1163                         osd_ios_new_item(dev, zde->lzd_reg.zde_dnode,
1164                                          map->olm_flags, map->olm_scan_dir,
1165                                          map->olm_handle_dirent);
1166         }
1167
1168         while (!list_empty(&dev->od_ios_list)) {
1169                 struct osd_ios_item *item;
1170
1171                 item = list_entry(dev->od_ios_list.next,
1172                                   struct osd_ios_item, oii_list);
1173                 list_del_init(&item->oii_list);
1174                 item->oii_scan_dir(env, dev, item->oii_parent,
1175                                    item->oii_handle_dirent, item->oii_flags);
1176                 OBD_FREE_PTR(item);
1177         }
1178
1179         EXIT;
1180 }
1181
1182 /* OI scrub start/stop */
1183
1184 int osd_scrub_start(const struct lu_env *env, struct osd_device *dev,
1185                     __u32 flags)
1186 {
1187         int rc;
1188         ENTRY;
1189
1190         if (dev->od_dt_dev.dd_rdonly)
1191                 RETURN(-EROFS);
1192
1193         /* od_otable_sem: prevent concurrent start/stop */
1194         down(&dev->od_otable_sem);
1195         rc = scrub_start(osd_scrub_main, &dev->od_scrub, dev, flags);
1196         up(&dev->od_otable_sem);
1197
1198         RETURN(rc == -EALREADY ? 0 : rc);
1199 }
1200
1201 static void osd_scrub_stop(struct osd_device *dev)
1202 {
1203         struct lustre_scrub *scrub = &dev->od_scrub;
1204         ENTRY;
1205
1206         /* od_otable_sem: prevent concurrent start/stop */
1207         down(&dev->od_otable_sem);
1208         scrub->os_paused = 1;
1209         scrub_stop(scrub);
1210         up(&dev->od_otable_sem);
1211
1212         EXIT;
1213 }
1214
1215 /* OI scrub setup/cleanup */
1216
1217 static const char osd_scrub_name[] = "OI_scrub";
1218
1219 int osd_scrub_setup(const struct lu_env *env, struct osd_device *dev)
1220 {
1221         struct osd_thread_info *info = osd_oti_get(env);
1222         struct lustre_scrub *scrub = &dev->od_scrub;
1223         struct scrub_file *sf = &scrub->os_file;
1224         struct lu_fid *fid = &info->oti_fid;
1225         struct dt_object *obj;
1226         uint64_t oid;
1227         int rc = 0;
1228         bool dirty = false;
1229         ENTRY;
1230
1231         memcpy(dev->od_uuid,
1232                &dsl_dataset_phys(dev->od_os->os_dsl_dataset)->ds_guid,
1233                sizeof(dsl_dataset_phys(dev->od_os->os_dsl_dataset)->ds_guid));
1234         memset(&dev->od_scrub, 0, sizeof(struct lustre_scrub));
1235         init_waitqueue_head(&scrub->os_thread.t_ctl_waitq);
1236         init_rwsem(&scrub->os_rwsem);
1237         spin_lock_init(&scrub->os_lock);
1238         INIT_LIST_HEAD(&scrub->os_inconsistent_items);
1239         scrub->os_name = osd_name(dev);
1240
1241         /* 'What the @fid is' is not imporatant, because the object
1242          * has no OI mapping, and only is visible inside the OSD.*/
1243         fid->f_seq = FID_SEQ_IGIF_MAX;
1244         if (dev->od_is_ost)
1245                 fid->f_oid = ((1 << 31) | dev->od_index) + 1;
1246         else
1247                 fid->f_oid = dev->od_index + 1;
1248         fid->f_ver = 0;
1249         rc = osd_obj_find_or_create(env, dev, dev->od_root,
1250                                     osd_scrub_name, &oid, fid, false);
1251         if (rc)
1252                 RETURN(rc);
1253
1254         rc = osd_idc_find_and_init_with_oid(env, dev, fid, oid);
1255         if (rc)
1256                 RETURN(rc);
1257
1258         obj = lu2dt(lu_object_find_slice(env, osd2lu_dev(dev), fid, NULL));
1259         if (IS_ERR_OR_NULL(obj))
1260                 RETURN(obj ? PTR_ERR(obj) : -ENOENT);
1261
1262         scrub->os_obj = obj;
1263         rc = scrub_file_load(env, scrub);
1264         if (rc == -ENOENT || rc == -EFAULT) {
1265                 scrub_file_init(scrub, dev->od_uuid);
1266                 dirty = true;
1267         } else if (rc < 0) {
1268                 GOTO(cleanup_obj, rc);
1269         } else {
1270                 if (memcmp(sf->sf_uuid, dev->od_uuid, 16) != 0) {
1271                         struct obd_uuid *old_uuid;
1272                         struct obd_uuid *new_uuid;
1273
1274                         OBD_ALLOC_PTR(old_uuid);
1275                         OBD_ALLOC_PTR(new_uuid);
1276                         if (!old_uuid || !new_uuid) {
1277                                 CERROR("%s: UUID has been changed, but"
1278                                        "failed to allocate RAM for report\n",
1279                                        osd_name(dev));
1280                         } else {
1281                                 class_uuid_unparse(sf->sf_uuid, old_uuid);
1282                                 class_uuid_unparse(dev->od_uuid, new_uuid);
1283                                 CDEBUG(D_LFSCK, "%s: UUID has been changed "
1284                                        "from %s to %s\n", osd_name(dev),
1285                                        old_uuid->uuid, new_uuid->uuid);
1286                         }
1287                         scrub_file_reset(scrub, dev->od_uuid, SF_INCONSISTENT);
1288                         dirty = true;
1289                         if (old_uuid)
1290                                 OBD_FREE_PTR(old_uuid);
1291                         if (new_uuid)
1292                                 OBD_FREE_PTR(new_uuid);
1293                 } else if (sf->sf_status == SS_SCANNING) {
1294                         sf->sf_status = SS_CRASHED;
1295                         dirty = true;
1296                 }
1297
1298                 if ((sf->sf_oi_count & (sf->sf_oi_count - 1)) != 0) {
1299                         LCONSOLE_WARN("%s: invalid oi count %d, set it to %d\n",
1300                                       osd_name(dev), sf->sf_oi_count,
1301                                       osd_oi_count);
1302                         sf->sf_oi_count = osd_oi_count;
1303                         dirty = true;
1304                 }
1305         }
1306
1307         if (sf->sf_pos_last_checkpoint != 0)
1308                 scrub->os_pos_current = sf->sf_pos_last_checkpoint + 1;
1309         else
1310                 scrub->os_pos_current = 1;
1311
1312         if (dirty) {
1313                 rc = scrub_file_store(env, scrub);
1314                 if (rc)
1315                         GOTO(cleanup_obj, rc);
1316         }
1317
1318         /* Initialize OI files. */
1319         rc = osd_oi_init(env, dev);
1320         if (rc < 0)
1321                 GOTO(cleanup_obj, rc);
1322
1323         if (!dev->od_dt_dev.dd_rdonly)
1324                 osd_initial_OI_scrub(env, dev);
1325
1326         if (!dev->od_dt_dev.dd_rdonly &&
1327             dev->od_auto_scrub_interval != AS_NEVER &&
1328             ((sf->sf_status == SS_PAUSED) ||
1329              (sf->sf_status == SS_CRASHED &&
1330               sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
1331                               SF_UPGRADE | SF_AUTO)) ||
1332              (sf->sf_status == SS_INIT &&
1333               sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
1334                               SF_UPGRADE))))
1335                 rc = osd_scrub_start(env, dev, SS_AUTO_FULL);
1336
1337         if (rc)
1338                 GOTO(cleanup_oi, rc);
1339
1340         RETURN(0);
1341
1342 cleanup_oi:
1343         osd_oi_fini(env, dev);
1344 cleanup_obj:
1345         dt_object_put_nocache(env, scrub->os_obj);
1346         scrub->os_obj = NULL;
1347
1348         return rc;
1349 }
1350
1351 void osd_scrub_cleanup(const struct lu_env *env, struct osd_device *dev)
1352 {
1353         struct lustre_scrub *scrub = &dev->od_scrub;
1354
1355         LASSERT(!dev->od_otable_it);
1356
1357         if (scrub->os_obj) {
1358                 osd_scrub_stop(dev);
1359                 dt_object_put_nocache(env, scrub->os_obj);
1360                 scrub->os_obj = NULL;
1361         }
1362
1363         if (dev->od_oi_table)
1364                 osd_oi_fini(env, dev);
1365 }
1366
1367 /* object table based iteration APIs */
1368
1369 static struct dt_it *osd_otable_it_init(const struct lu_env *env,
1370                                        struct dt_object *dt, __u32 attr)
1371 {
1372         enum dt_otable_it_flags flags = attr >> DT_OTABLE_IT_FLAGS_SHIFT;
1373         enum dt_otable_it_valid valid = attr & ~DT_OTABLE_IT_FLAGS_MASK;
1374         struct osd_device *dev = osd_dev(dt->do_lu.lo_dev);
1375         struct lustre_scrub *scrub = &dev->od_scrub;
1376         struct osd_otable_it *it;
1377         __u32 start = 0;
1378         int rc;
1379         ENTRY;
1380
1381         if (dev->od_dt_dev.dd_rdonly)
1382                 RETURN(ERR_PTR(-EROFS));
1383
1384         /* od_otable_sem: prevent concurrent init/fini */
1385         down(&dev->od_otable_sem);
1386         if (dev->od_otable_it)
1387                 GOTO(out, it = ERR_PTR(-EALREADY));
1388
1389         OBD_ALLOC_PTR(it);
1390         if (!it)
1391                 GOTO(out, it = ERR_PTR(-ENOMEM));
1392
1393         if (flags & DOIF_OUTUSED)
1394                 it->ooi_used_outside = 1;
1395
1396         if (flags & DOIF_RESET)
1397                 start |= SS_RESET;
1398
1399         if (valid & DOIV_ERROR_HANDLE) {
1400                 if (flags & DOIF_FAILOUT)
1401                         start |= SS_SET_FAILOUT;
1402                 else
1403                         start |= SS_CLEAR_FAILOUT;
1404         }
1405
1406         if (valid & DOIV_DRYRUN) {
1407                 if (flags & DOIF_DRYRUN)
1408                         start |= SS_SET_DRYRUN;
1409                 else
1410                         start |= SS_CLEAR_DRYRUN;
1411         }
1412
1413         /* XXX: dmu_object_next() does NOT find dnodes allocated
1414          *      in the current non-committed txg, so we force txg
1415          *      commit to find all existing dnodes ... */
1416         txg_wait_synced(dmu_objset_pool(dev->od_os), 0ULL);
1417
1418         dev->od_otable_it = it;
1419         it->ooi_dev = dev;
1420         rc = scrub_start(osd_scrub_main, scrub, dev, start & ~SS_AUTO_PARTIAL);
1421         if (rc == -EALREADY) {
1422                 it->ooi_pos = 1;
1423         } else if (rc < 0) {
1424                 dev->od_otable_it = NULL;
1425                 OBD_FREE_PTR(it);
1426                 it = ERR_PTR(rc);
1427         } else {
1428                 it->ooi_pos = scrub->os_pos_current;
1429         }
1430
1431         GOTO(out, it);
1432
1433 out:
1434         up(&dev->od_otable_sem);
1435         return (struct dt_it *)it;
1436 }
1437
1438 static void osd_otable_it_fini(const struct lu_env *env, struct dt_it *di)
1439 {
1440         struct osd_otable_it *it = (struct osd_otable_it *)di;
1441         struct osd_device *dev = it->ooi_dev;
1442
1443         /* od_otable_sem: prevent concurrent init/fini */
1444         down(&dev->od_otable_sem);
1445         scrub_stop(&dev->od_scrub);
1446         LASSERT(dev->od_otable_it == it);
1447
1448         dev->od_otable_it = NULL;
1449         up(&dev->od_otable_sem);
1450         OBD_FREE_PTR(it);
1451 }
1452
1453 static int osd_otable_it_get(const struct lu_env *env,
1454                              struct dt_it *di, const struct dt_key *key)
1455 {
1456         return 0;
1457 }
1458
1459 static void osd_otable_it_put(const struct lu_env *env, struct dt_it *di)
1460 {
1461 }
1462
1463 static void osd_otable_it_preload(const struct lu_env *env,
1464                                   struct osd_otable_it *it)
1465 {
1466         struct osd_device *dev = it->ooi_dev;
1467         int rc;
1468
1469         /* can go negative on the very first access to the iterator
1470          * or if some non-Lustre objects were found */
1471         if (unlikely(it->ooi_prefetched < 0))
1472                 it->ooi_prefetched = 0;
1473
1474         if (it->ooi_prefetched >= (OTABLE_PREFETCH >> 1))
1475                 return;
1476
1477         if (it->ooi_prefetched_dnode == 0)
1478                 it->ooi_prefetched_dnode = it->ooi_pos;
1479
1480         while (it->ooi_prefetched < OTABLE_PREFETCH) {
1481                 rc = -dmu_object_next(dev->od_os, &it->ooi_prefetched_dnode,
1482                                       B_FALSE, 0);
1483                 if (rc)
1484                         break;
1485
1486                 osd_dmu_prefetch(dev->od_os, it->ooi_prefetched_dnode,
1487                                  0, 0, 0, ZIO_PRIORITY_ASYNC_READ);
1488                 it->ooi_prefetched++;
1489         }
1490 }
1491
1492 static inline int
1493 osd_otable_it_wakeup(struct lustre_scrub *scrub, struct osd_otable_it *it)
1494 {
1495         spin_lock(&scrub->os_lock);
1496         if (it->ooi_pos < scrub->os_pos_current || scrub->os_waiting ||
1497             !thread_is_running(&scrub->os_thread))
1498                 it->ooi_waiting = 0;
1499         else
1500                 it->ooi_waiting = 1;
1501         spin_unlock(&scrub->os_lock);
1502
1503         return !it->ooi_waiting;
1504 }
1505
1506 static int osd_otable_it_next(const struct lu_env *env, struct dt_it *di)
1507 {
1508         struct osd_otable_it *it = (struct osd_otable_it *)di;
1509         struct osd_device *dev = it->ooi_dev;
1510         struct lustre_scrub *scrub = &dev->od_scrub;
1511         struct ptlrpc_thread *thread = &scrub->os_thread;
1512         struct l_wait_info lwi = { 0 };
1513         struct lustre_mdt_attrs *lma = NULL;
1514         nvlist_t *nvbuf = NULL;
1515         int size = 0;
1516         int rc;
1517         ENTRY;
1518
1519         LASSERT(it->ooi_user_ready);
1520         fid_zero(&it->ooi_fid);
1521
1522         if (unlikely(it->ooi_all_cached))
1523                 RETURN(1);
1524
1525 again:
1526         if (nvbuf) {
1527                 nvlist_free(nvbuf);
1528                 nvbuf = NULL;
1529                 lma = NULL;
1530                 size = 0;
1531         }
1532
1533         if (it->ooi_pos >= scrub->os_pos_current)
1534                 l_wait_event(thread->t_ctl_waitq,
1535                              osd_otable_it_wakeup(scrub, it),
1536                              &lwi);
1537
1538         if (!thread_is_running(thread) && !it->ooi_used_outside)
1539                 GOTO(out, rc = 1);
1540
1541         rc = -dmu_object_next(dev->od_os, &it->ooi_pos, B_FALSE, 0);
1542         if (rc) {
1543                 if (unlikely(rc == -ESRCH)) {
1544                         it->ooi_all_cached = 1;
1545                         rc = 1;
1546                 }
1547
1548                 GOTO(out, rc);
1549         }
1550
1551         rc = __osd_xattr_load_by_oid(dev, it->ooi_pos, &nvbuf);
1552
1553         if (!scrub->os_full_speed)
1554                 spin_lock(&scrub->os_lock);
1555         it->ooi_prefetched--;
1556         if (!scrub->os_full_speed) {
1557                 if (scrub->os_waiting) {
1558                         scrub->os_waiting = 0;
1559                         wake_up_all(&thread->t_ctl_waitq);
1560                 }
1561                 spin_unlock(&scrub->os_lock);
1562         }
1563
1564         if (rc == -ENOENT || rc == -EEXIST || rc == -ENODATA)
1565                 goto again;
1566
1567         if (rc)
1568                 GOTO(out, rc);
1569
1570         LASSERT(nvbuf != NULL);
1571         rc = -nvlist_lookup_byte_array(nvbuf, XATTR_NAME_LMA,
1572                                        (uchar_t **)&lma, &size);
1573         if (rc || size == 0)
1574                 /* It is either non-Lustre object or OSD internal object,
1575                  * ignore it, go ahead */
1576                 goto again;
1577
1578         LASSERTF(lma != NULL, "corrupted LMA, size %d\n", size);
1579         lustre_lma_swab(lma);
1580         if (unlikely(lma->lma_compat & LMAC_NOT_IN_OI ||
1581                      lma->lma_incompat & LMAI_AGENT))
1582                 goto again;
1583
1584         it->ooi_fid = lma->lma_self_fid;
1585
1586         GOTO(out, rc = 0);
1587
1588 out:
1589         if (nvbuf)
1590                 nvlist_free(nvbuf);
1591
1592         if (!rc && scrub->os_full_speed)
1593                 osd_otable_it_preload(env, it);
1594
1595         return rc;
1596 }
1597
1598 static struct dt_key *osd_otable_it_key(const struct lu_env *env,
1599                                         const struct dt_it *di)
1600 {
1601         return NULL;
1602 }
1603
1604 static int osd_otable_it_key_size(const struct lu_env *env,
1605                                   const struct dt_it *di)
1606 {
1607         return sizeof(__u64);
1608 }
1609
1610 static int osd_otable_it_rec(const struct lu_env *env, const struct dt_it *di,
1611                              struct dt_rec *rec, __u32 attr)
1612 {
1613         struct osd_otable_it *it  = (struct osd_otable_it *)di;
1614         struct lu_fid *fid = (struct lu_fid *)rec;
1615
1616         *fid = it->ooi_fid;
1617         return 0;
1618 }
1619
1620 static __u64 osd_otable_it_store(const struct lu_env *env,
1621                                  const struct dt_it *di)
1622 {
1623         struct osd_otable_it *it = (struct osd_otable_it *)di;
1624
1625         return it->ooi_pos;
1626 }
1627
1628 /**
1629  * Set the OSD layer iteration start position as the specified hash.
1630  */
1631 static int osd_otable_it_load(const struct lu_env *env,
1632                               const struct dt_it *di, __u64 hash)
1633 {
1634         struct osd_otable_it *it = (struct osd_otable_it *)di;
1635         struct osd_device *dev = it->ooi_dev;
1636         struct lustre_scrub *scrub = &dev->od_scrub;
1637         int rc;
1638         ENTRY;
1639
1640         /* Forbid to set iteration position after iteration started. */
1641         if (it->ooi_user_ready)
1642                 RETURN(-EPERM);
1643
1644         if (hash > OSD_OTABLE_MAX_HASH)
1645                 hash = OSD_OTABLE_MAX_HASH;
1646
1647         /* The hash is the last checkpoint position,
1648          * we will start from the next one. */
1649         it->ooi_pos = hash + 1;
1650         it->ooi_prefetched = 0;
1651         it->ooi_prefetched_dnode = 0;
1652         it->ooi_user_ready = 1;
1653         if (!scrub->os_full_speed)
1654                 wake_up_all(&scrub->os_thread.t_ctl_waitq);
1655
1656         /* Unplug OSD layer iteration by the first next() call. */
1657         rc = osd_otable_it_next(env, (struct dt_it *)it);
1658
1659         RETURN(rc);
1660 }
1661
1662 static int osd_otable_it_key_rec(const struct lu_env *env,
1663                                  const struct dt_it *di, void *key_rec)
1664 {
1665         return 0;
1666 }
1667
1668 const struct dt_index_operations osd_otable_ops = {
1669         .dio_it = {
1670                 .init     = osd_otable_it_init,
1671                 .fini     = osd_otable_it_fini,
1672                 .get      = osd_otable_it_get,
1673                 .put      = osd_otable_it_put,
1674                 .next     = osd_otable_it_next,
1675                 .key      = osd_otable_it_key,
1676                 .key_size = osd_otable_it_key_size,
1677                 .rec      = osd_otable_it_rec,
1678                 .store    = osd_otable_it_store,
1679                 .load     = osd_otable_it_load,
1680                 .key_rec  = osd_otable_it_key_rec,
1681         }
1682 };
1683
1684 /* high priority inconsistent items list APIs */
1685
1686 int osd_oii_insert(const struct lu_env *env, struct osd_device *dev,
1687                    const struct lu_fid *fid, uint64_t oid, bool insert)
1688 {
1689         struct lustre_scrub *scrub = &dev->od_scrub;
1690         struct ptlrpc_thread *thread = &scrub->os_thread;
1691         struct osd_inconsistent_item *oii;
1692         bool wakeup = false;
1693         ENTRY;
1694
1695         osd_idc_find_and_init_with_oid(env, dev, fid, oid);
1696         OBD_ALLOC_PTR(oii);
1697         if (unlikely(!oii))
1698                 RETURN(-ENOMEM);
1699
1700         INIT_LIST_HEAD(&oii->oii_list);
1701         oii->oii_cache.oic_dev = dev;
1702         oii->oii_cache.oic_fid = *fid;
1703         oii->oii_cache.oic_dnode = oid;
1704         oii->oii_insert = insert;
1705
1706         spin_lock(&scrub->os_lock);
1707         if (unlikely(!thread_is_running(thread))) {
1708                 spin_unlock(&scrub->os_lock);
1709                 OBD_FREE_PTR(oii);
1710                 RETURN(-EAGAIN);
1711         }
1712
1713         if (list_empty(&scrub->os_inconsistent_items))
1714                 wakeup = true;
1715         list_add_tail(&oii->oii_list, &scrub->os_inconsistent_items);
1716         spin_unlock(&scrub->os_lock);
1717
1718         if (wakeup)
1719                 wake_up_all(&thread->t_ctl_waitq);
1720
1721         RETURN(0);
1722 }
1723
1724 int osd_oii_lookup(struct osd_device *dev, const struct lu_fid *fid,
1725                    uint64_t *oid)
1726 {
1727         struct lustre_scrub *scrub = &dev->od_scrub;
1728         struct osd_inconsistent_item *oii;
1729         int ret = -ENOENT;
1730         ENTRY;
1731
1732         spin_lock(&scrub->os_lock);
1733         list_for_each_entry(oii, &scrub->os_inconsistent_items, oii_list) {
1734                 if (lu_fid_eq(fid, &oii->oii_cache.oic_fid)) {
1735                         *oid = oii->oii_cache.oic_dnode;
1736                         ret = 0;
1737                         break;
1738                 }
1739         }
1740         spin_unlock(&scrub->os_lock);
1741
1742         RETURN(ret);
1743 }