4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2017, Intel Corporation.
26 * lustre/obdclass/scrub.c
28 * The OI scrub is used for checking and (re)building Object Index files
29 * that are usually backend special. Here are some general scrub related
30 * functions that can be shared by different backends for OI scrub.
32 * Author: Fan Yong <fan.yong@intel.com>
35 #define DEBUG_SUBSYSTEM S_LFSCK
37 #include <linux/kthread.h>
38 #include <lustre_scrub.h>
39 #include <lustre_lib.h>
40 #include <lustre_fid.h>
42 static inline struct dt_device *scrub_obj2dev(struct dt_object *obj)
44 return container_of0(obj->do_lu.lo_dev, struct dt_device, dd_lu_dev);
47 static void scrub_file_to_cpu(struct scrub_file *des, struct scrub_file *src)
49 memcpy(des->sf_uuid, src->sf_uuid, 16);
50 des->sf_flags = le64_to_cpu(src->sf_flags);
51 des->sf_magic = le32_to_cpu(src->sf_magic);
52 des->sf_status = le16_to_cpu(src->sf_status);
53 des->sf_param = le16_to_cpu(src->sf_param);
54 des->sf_time_last_complete =
55 le64_to_cpu(src->sf_time_last_complete);
56 des->sf_time_latest_start =
57 le64_to_cpu(src->sf_time_latest_start);
58 des->sf_time_last_checkpoint =
59 le64_to_cpu(src->sf_time_last_checkpoint);
60 des->sf_pos_latest_start =
61 le64_to_cpu(src->sf_pos_latest_start);
62 des->sf_pos_last_checkpoint =
63 le64_to_cpu(src->sf_pos_last_checkpoint);
64 des->sf_pos_first_inconsistent =
65 le64_to_cpu(src->sf_pos_first_inconsistent);
66 des->sf_items_checked =
67 le64_to_cpu(src->sf_items_checked);
68 des->sf_items_updated =
69 le64_to_cpu(src->sf_items_updated);
70 des->sf_items_failed =
71 le64_to_cpu(src->sf_items_failed);
72 des->sf_items_updated_prior =
73 le64_to_cpu(src->sf_items_updated_prior);
74 des->sf_run_time = le32_to_cpu(src->sf_run_time);
75 des->sf_success_count = le32_to_cpu(src->sf_success_count);
76 des->sf_oi_count = le16_to_cpu(src->sf_oi_count);
77 des->sf_internal_flags = le16_to_cpu(src->sf_internal_flags);
78 memcpy(des->sf_oi_bitmap, src->sf_oi_bitmap, SCRUB_OI_BITMAP_SIZE);
81 static void scrub_file_to_le(struct scrub_file *des, struct scrub_file *src)
83 memcpy(des->sf_uuid, src->sf_uuid, 16);
84 des->sf_flags = cpu_to_le64(src->sf_flags);
85 des->sf_magic = cpu_to_le32(src->sf_magic);
86 des->sf_status = cpu_to_le16(src->sf_status);
87 des->sf_param = cpu_to_le16(src->sf_param);
88 des->sf_time_last_complete =
89 cpu_to_le64(src->sf_time_last_complete);
90 des->sf_time_latest_start =
91 cpu_to_le64(src->sf_time_latest_start);
92 des->sf_time_last_checkpoint =
93 cpu_to_le64(src->sf_time_last_checkpoint);
94 des->sf_pos_latest_start =
95 cpu_to_le64(src->sf_pos_latest_start);
96 des->sf_pos_last_checkpoint =
97 cpu_to_le64(src->sf_pos_last_checkpoint);
98 des->sf_pos_first_inconsistent =
99 cpu_to_le64(src->sf_pos_first_inconsistent);
100 des->sf_items_checked =
101 cpu_to_le64(src->sf_items_checked);
102 des->sf_items_updated =
103 cpu_to_le64(src->sf_items_updated);
104 des->sf_items_failed =
105 cpu_to_le64(src->sf_items_failed);
106 des->sf_items_updated_prior =
107 cpu_to_le64(src->sf_items_updated_prior);
108 des->sf_run_time = cpu_to_le32(src->sf_run_time);
109 des->sf_success_count = cpu_to_le32(src->sf_success_count);
110 des->sf_oi_count = cpu_to_le16(src->sf_oi_count);
111 des->sf_internal_flags = cpu_to_le16(src->sf_internal_flags);
112 memcpy(des->sf_oi_bitmap, src->sf_oi_bitmap, SCRUB_OI_BITMAP_SIZE);
115 void scrub_file_init(struct lustre_scrub *scrub, __u8 *uuid)
117 struct scrub_file *sf = &scrub->os_file;
119 memset(sf, 0, sizeof(*sf));
120 memcpy(sf->sf_uuid, uuid, 16);
121 sf->sf_magic = SCRUB_MAGIC_V1;
122 sf->sf_status = SS_INIT;
124 EXPORT_SYMBOL(scrub_file_init);
126 void scrub_file_reset(struct lustre_scrub *scrub, __u8 *uuid, __u64 flags)
128 struct scrub_file *sf = &scrub->os_file;
130 CDEBUG(D_LFSCK, "%s: reset OI scrub file, old flags = "
131 "%#llx, add flags = %#llx\n",
132 scrub->os_name, sf->sf_flags, flags);
134 memcpy(sf->sf_uuid, uuid, 16);
135 sf->sf_status = SS_INIT;
136 sf->sf_flags |= flags;
137 sf->sf_flags &= ~SF_AUTO;
139 sf->sf_time_latest_start = 0;
140 sf->sf_time_last_checkpoint = 0;
141 sf->sf_pos_latest_start = 0;
142 sf->sf_pos_last_checkpoint = 0;
143 sf->sf_pos_first_inconsistent = 0;
144 sf->sf_items_checked = 0;
145 sf->sf_items_updated = 0;
146 sf->sf_items_failed = 0;
147 sf->sf_items_noscrub = 0;
148 sf->sf_items_igif = 0;
149 if (!scrub->os_in_join)
150 sf->sf_items_updated_prior = 0;
152 EXPORT_SYMBOL(scrub_file_reset);
154 int scrub_file_load(const struct lu_env *env, struct lustre_scrub *scrub)
156 struct scrub_file *sf = &scrub->os_file;
157 struct lu_buf buf = {
158 .lb_buf = &scrub->os_file_disk,
159 .lb_len = sizeof(scrub->os_file_disk)
164 rc = dt_read(env, scrub->os_obj, &buf, &pos);
167 CERROR("%s: fail to load scrub file: rc = %d\n",
177 if (rc < buf.lb_len) {
178 CDEBUG(D_LFSCK, "%s: fail to load scrub file, "
179 "expected = %d: rc = %d\n",
180 scrub->os_name, (int)buf.lb_len, rc);
184 scrub_file_to_cpu(sf, &scrub->os_file_disk);
185 if (sf->sf_magic != SCRUB_MAGIC_V1) {
186 CDEBUG(D_LFSCK, "%s: invalid scrub magic 0x%x != 0x%x\n",
187 scrub->os_name, sf->sf_magic, SCRUB_MAGIC_V1);
193 EXPORT_SYMBOL(scrub_file_load);
195 int scrub_file_store(const struct lu_env *env, struct lustre_scrub *scrub)
197 struct scrub_file *sf = &scrub->os_file_disk;
198 struct dt_object *obj = scrub->os_obj;
199 struct dt_device *dev = scrub_obj2dev(obj);
200 struct lu_buf buf = {
202 .lb_len = sizeof(*sf)
209 /* Skip store under rdonly mode. */
213 scrub_file_to_le(sf, &scrub->os_file);
214 th = dt_trans_create(env, dev);
216 GOTO(log, rc = PTR_ERR(th));
218 rc = dt_declare_record_write(env, obj, &buf, pos, th);
222 rc = dt_trans_start_local(env, dev, th);
226 rc = dt_record_write(env, obj, &buf, &pos, th);
231 dt_trans_stop(env, dev, th);
235 CERROR("%s: store scrub file: rc = %d\n",
238 CDEBUG(D_LFSCK, "%s: store scrub file: rc = %d\n",
241 scrub->os_time_last_checkpoint = cfs_time_current();
242 scrub->os_time_next_checkpoint = scrub->os_time_last_checkpoint +
243 cfs_time_seconds(SCRUB_CHECKPOINT_INTERVAL);
246 EXPORT_SYMBOL(scrub_file_store);
248 int scrub_checkpoint(const struct lu_env *env, struct lustre_scrub *scrub)
250 struct scrub_file *sf = &scrub->os_file;
253 if (likely(cfs_time_before(cfs_time_current(),
254 scrub->os_time_next_checkpoint) ||
255 scrub->os_new_checked == 0))
258 CDEBUG(D_LFSCK, "%s: OI scrub checkpoint at pos %llu\n",
259 scrub->os_name, scrub->os_pos_current);
261 down_write(&scrub->os_rwsem);
262 sf->sf_items_checked += scrub->os_new_checked;
263 scrub->os_new_checked = 0;
264 sf->sf_pos_last_checkpoint = scrub->os_pos_current;
265 sf->sf_time_last_checkpoint = cfs_time_current_sec();
266 sf->sf_run_time += cfs_duration_sec(cfs_time_current() + HALF_SEC -
267 scrub->os_time_last_checkpoint);
268 rc = scrub_file_store(env, scrub);
269 up_write(&scrub->os_rwsem);
273 EXPORT_SYMBOL(scrub_checkpoint);
275 int scrub_start(int (*threadfn)(void *data), struct lustre_scrub *scrub,
276 void *data, __u32 flags)
278 struct ptlrpc_thread *thread = &scrub->os_thread;
279 struct l_wait_info lwi = { 0 };
280 struct task_struct *task;
285 /* os_lock: sync status between stop and scrub thread */
286 spin_lock(&scrub->os_lock);
287 if (thread_is_running(thread)) {
288 spin_unlock(&scrub->os_lock);
292 if (unlikely(thread_is_stopping(thread))) {
293 spin_unlock(&scrub->os_lock);
294 l_wait_event(thread->t_ctl_waitq,
295 thread_is_stopped(thread),
299 spin_unlock(&scrub->os_lock);
301 if (scrub->os_file.sf_status == SS_COMPLETED) {
302 if (!(flags & SS_SET_FAILOUT))
303 flags |= SS_CLEAR_FAILOUT;
305 if (!(flags & SS_SET_DRYRUN))
306 flags |= SS_CLEAR_DRYRUN;
311 scrub->os_start_flags = flags;
312 thread_set_flags(thread, 0);
313 task = kthread_run(threadfn, data, "OI_scrub");
316 CERROR("%s: cannot start iteration thread: rc = %d\n",
321 l_wait_event(thread->t_ctl_waitq,
322 thread_is_running(thread) || thread_is_stopped(thread),
327 EXPORT_SYMBOL(scrub_start);
329 void scrub_stop(struct lustre_scrub *scrub)
331 struct ptlrpc_thread *thread = &scrub->os_thread;
332 struct l_wait_info lwi = { 0 };
334 /* os_lock: sync status between stop and scrub thread */
335 spin_lock(&scrub->os_lock);
336 if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
337 thread_set_flags(thread, SVC_STOPPING);
338 spin_unlock(&scrub->os_lock);
339 wake_up_all(&thread->t_ctl_waitq);
340 l_wait_event(thread->t_ctl_waitq,
341 thread_is_stopped(thread),
343 /* Do not skip the last lock/unlock, which can guarantee that
344 * the caller cannot return until the OI scrub thread exit. */
345 spin_lock(&scrub->os_lock);
347 spin_unlock(&scrub->os_lock);
349 EXPORT_SYMBOL(scrub_stop);
351 const char *scrub_status_names[] = {
362 const char *scrub_flags_names[] = {
370 const char *scrub_param_names[] = {
376 static void scrub_bits_dump(struct seq_file *m, int bits, const char *names[],
382 seq_printf(m, "%s:%c", prefix, bits != 0 ? ' ' : '\n');
384 for (i = 0, flag = 1; bits != 0; i++, flag = 1 << i) {
387 seq_printf(m, "%s%c", names[i],
388 bits != 0 ? ',' : '\n');
393 static void scrub_time_dump(struct seq_file *m, __u64 time, const char *prefix)
396 seq_printf(m, "%s: %llu seconds\n", prefix,
397 cfs_time_current_sec() - time);
399 seq_printf(m, "%s: N/A\n", prefix);
402 static void scrub_pos_dump(struct seq_file *m, __u64 pos, const char *prefix)
405 seq_printf(m, "%s: %llu\n", prefix, pos);
407 seq_printf(m, "%s: N/A\n", prefix);
410 void scrub_dump(struct seq_file *m, struct lustre_scrub *scrub)
412 struct scrub_file *sf = &scrub->os_file;
416 down_read(&scrub->os_rwsem);
417 seq_printf(m, "name: OI_scrub\n"
421 sf->sf_magic, (int)sf->sf_oi_count,
422 scrub_status_names[sf->sf_status]);
424 scrub_bits_dump(m, sf->sf_flags, scrub_flags_names, "flags");
426 scrub_bits_dump(m, sf->sf_param, scrub_param_names, "param");
428 scrub_time_dump(m, sf->sf_time_last_complete,
429 "time_since_last_completed");
431 scrub_time_dump(m, sf->sf_time_latest_start,
432 "time_since_latest_start");
434 scrub_time_dump(m, sf->sf_time_last_checkpoint,
435 "time_since_last_checkpoint");
437 scrub_pos_dump(m, sf->sf_pos_latest_start,
438 "latest_start_position");
440 scrub_pos_dump(m, sf->sf_pos_last_checkpoint,
441 "last_checkpoint_position");
443 scrub_pos_dump(m, sf->sf_pos_first_inconsistent,
444 "first_failure_position");
446 checked = sf->sf_items_checked + scrub->os_new_checked;
447 seq_printf(m, "checked: %llu\n"
453 "success_count: %u\n",
455 sf->sf_param & SP_DRYRUN ? "inconsistent" : "updated",
456 sf->sf_items_updated, sf->sf_items_failed,
457 sf->sf_param & SP_DRYRUN ? "inconsistent" : "updated",
458 sf->sf_items_updated_prior, sf->sf_items_noscrub,
459 sf->sf_items_igif, sf->sf_success_count);
462 if (thread_is_running(&scrub->os_thread)) {
463 cfs_duration_t duration = cfs_time_current() -
464 scrub->os_time_last_checkpoint;
465 __u64 new_checked = msecs_to_jiffies(scrub->os_new_checked *
467 __u32 rtime = sf->sf_run_time +
468 cfs_duration_sec(duration + HALF_SEC);
471 do_div(new_checked, duration);
473 do_div(speed, rtime);
474 seq_printf(m, "run_time: %u seconds\n"
475 "average_speed: %llu objects/sec\n"
476 "real-time_speed: %llu objects/sec\n"
477 "current_position: %llu\n"
478 "scrub_in_prior: %s\n"
479 "scrub_full_speed: %s\n"
480 "partial_scan: %s\n",
481 rtime, speed, new_checked, scrub->os_pos_current,
482 scrub->os_in_prior ? "yes" : "no",
483 scrub->os_full_speed ? "yes" : "no",
484 scrub->os_partial_scan ? "yes" : "no");
486 if (sf->sf_run_time != 0)
487 do_div(speed, sf->sf_run_time);
488 seq_printf(m, "run_time: %u seconds\n"
489 "average_speed: %llu objects/sec\n"
490 "real-time_speed: N/A\n"
491 "current_position: N/A\n",
492 sf->sf_run_time, speed);
495 up_read(&scrub->os_rwsem);
497 EXPORT_SYMBOL(scrub_dump);
499 int lustre_liru_new(struct list_head *head, const struct lu_fid *pfid,
500 const struct lu_fid *cfid, __u64 child,
501 const char *name, int namelen)
503 struct lustre_index_restore_unit *liru;
504 int len = sizeof(*liru) + namelen + 1;
506 OBD_ALLOC(liru, len);
510 INIT_LIST_HEAD(&liru->liru_link);
511 liru->liru_pfid = *pfid;
512 liru->liru_cfid = *cfid;
513 liru->liru_clid = child;
514 liru->liru_len = len;
515 memcpy(liru->liru_name, name, namelen);
516 liru->liru_name[namelen] = 0;
517 list_add_tail(&liru->liru_link, head);
521 EXPORT_SYMBOL(lustre_liru_new);
523 int lustre_index_register(struct dt_device *dev, const char *devname,
524 struct list_head *head, spinlock_t *lock, int *guard,
525 const struct lu_fid *fid,
526 __u32 keysize, __u32 recsize)
528 struct lustre_index_backup_unit *libu, *pos;
532 if (dev->dd_rdonly || *guard)
539 INIT_LIST_HEAD(&libu->libu_link);
540 libu->libu_keysize = keysize;
541 libu->libu_recsize = recsize;
542 libu->libu_fid = *fid;
545 if (unlikely(*guard)) {
552 list_for_each_entry_reverse(pos, head, libu_link) {
553 rc = lu_fid_cmp(&pos->libu_fid, fid);
555 list_add(&libu->libu_link, &pos->libu_link);
562 /* Registered already. But the former registered one
563 * has different keysize/recsize. It may because that
564 * the former values are from disk and corrupted, then
565 * replace it with new values. */
566 if (unlikely(keysize != pos->libu_keysize ||
567 recsize != pos->libu_recsize)) {
568 CWARN("%s: the index "DFID" has registered "
569 "with %u/%u, may be invalid, replace "
571 devname, PFID(fid), pos->libu_keysize,
572 pos->libu_recsize, keysize, recsize);
574 pos->libu_keysize = keysize;
575 pos->libu_recsize = recsize;
587 list_add(&libu->libu_link, head);
592 EXPORT_SYMBOL(lustre_index_register);
594 static void lustre_index_degister(struct list_head *head, spinlock_t *lock,
595 const struct lu_fid *fid)
597 struct lustre_index_backup_unit *libu;
601 list_for_each_entry_reverse(libu, head, libu_link) {
602 rc = lu_fid_cmp(&libu->libu_fid, fid);
603 /* NOT registered. */
608 list_del(&libu->libu_link);
619 lustre_index_backup_make_header(struct lustre_index_backup_header *header,
620 __u32 keysize, __u32 recsize,
621 const struct lu_fid *fid, __u32 count)
623 memset(header, 0, sizeof(*header));
624 header->libh_magic = cpu_to_le32(INDEX_BACKUP_MAGIC_V1);
625 header->libh_count = cpu_to_le32(count);
626 header->libh_keysize = cpu_to_le32(keysize);
627 header->libh_recsize = cpu_to_le32(recsize);
628 fid_cpu_to_le(&header->libh_owner, fid);
631 static int lustre_index_backup_body(const struct lu_env *env,
632 struct dt_object *obj, loff_t *pos,
633 void *buf, int bufsize)
635 struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
637 struct lu_buf lbuf = {
644 th = dt_trans_create(env, dev);
648 rc = dt_declare_record_write(env, obj, &lbuf, *pos, th);
652 rc = dt_trans_start_local(env, dev, th);
656 rc = dt_record_write(env, obj, &lbuf, pos, th);
661 dt_trans_stop(env, dev, th);
665 static int lustre_index_backup_header(const struct lu_env *env,
666 struct dt_object *obj,
667 const struct lu_fid *tgt_fid,
668 __u32 keysize, __u32 recsize,
669 void *buf, int bufsize, int count)
671 struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
672 struct lustre_index_backup_header *header = buf;
673 struct lu_attr *la = buf;
675 struct lu_buf lbuf = {
677 .lb_len = sizeof(*header)
679 loff_t size = sizeof(*header) + (keysize + recsize) * count;
685 LASSERT(sizeof(*la) <= bufsize);
686 LASSERT(sizeof(*header) <= bufsize);
688 rc = dt_attr_get(env, obj, la);
692 if (la->la_size > size)
695 lustre_index_backup_make_header(header, keysize, recsize,
697 th = dt_trans_create(env, dev);
701 rc = dt_declare_record_write(env, obj, &lbuf, pos, th);
706 rc = dt_declare_punch(env, obj, size, OBD_OBJECT_EOF, th);
711 rc = dt_trans_start_local(env, dev, th);
715 rc = dt_record_write(env, obj, &lbuf, &pos, th);
717 rc = dt_punch(env, obj, size, OBD_OBJECT_EOF, th);
722 dt_trans_stop(env, dev, th);
726 static int lustre_index_update_lma(const struct lu_env *env,
727 struct dt_object *obj,
728 void *buf, int bufsize)
730 struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
731 struct lustre_mdt_attrs *lma = buf;
732 struct lu_buf lbuf = {
734 .lb_len = sizeof(struct lustre_ost_attrs)
737 int fl = LU_XATTR_REPLACE;
741 LASSERT(bufsize >= lbuf.lb_len);
743 rc = dt_xattr_get(env, obj, &lbuf, XATTR_NAME_LMA);
744 if (unlikely(rc == -ENODATA)) {
745 fl = LU_XATTR_CREATE;
746 lustre_lma_init(lma, lu_object_fid(&obj->do_lu),
749 } else if (rc < sizeof(*lma)) {
750 RETURN(rc < 0 ? rc : -EFAULT);
752 lustre_lma_swab(lma);
753 if (lma->lma_compat & LMAC_IDX_BACKUP)
756 lma->lma_compat |= LMAC_IDX_BACKUP;
759 lustre_lma_swab(lma);
761 th = dt_trans_create(env, dev);
765 rc = dt_declare_xattr_set(env, obj, &lbuf, XATTR_NAME_LMA, fl, th);
769 rc = dt_trans_start_local(env, dev, th);
773 rc = dt_xattr_set(env, obj, &lbuf, XATTR_NAME_LMA, fl, th);
778 dt_trans_stop(env, dev, th);
782 static int lustre_index_backup_one(const struct lu_env *env,
783 struct local_oid_storage *los,
784 struct dt_object *parent,
785 struct lustre_index_backup_unit *libu,
786 char *buf, int bufsize)
788 struct dt_device *dev = scrub_obj2dev(parent);
789 struct dt_object *tgt_obj = NULL;
790 struct dt_object *bak_obj = NULL;
791 const struct dt_it_ops *iops;
793 loff_t pos = sizeof(struct lustre_index_backup_header);
799 tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
800 &libu->libu_fid, NULL));
801 if (IS_ERR_OR_NULL(tgt_obj))
802 GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
804 if (!dt_object_exists(tgt_obj))
807 if (!tgt_obj->do_index_ops) {
808 struct dt_index_features feat;
810 feat.dif_flags = DT_IND_UPDATE;
811 feat.dif_keysize_min = libu->libu_keysize;
812 feat.dif_keysize_max = libu->libu_keysize;
813 feat.dif_recsize_min = libu->libu_recsize;
814 feat.dif_recsize_max = libu->libu_recsize;
815 feat.dif_ptrsize = 4;
816 rc = tgt_obj->do_ops->do_index_try(env, tgt_obj, &feat);
821 lustre_fid2lbx(buf, &libu->libu_fid, bufsize);
822 bak_obj = local_file_find_or_create(env, los, parent, buf,
823 S_IFREG | S_IRUGO | S_IWUSR);
824 if (IS_ERR_OR_NULL(bak_obj))
825 GOTO(out, rc = bak_obj ? PTR_ERR(bak_obj) : -ENOENT);
827 iops = &tgt_obj->do_index_ops->dio_it;
828 di = iops->init(env, tgt_obj, 0);
830 GOTO(out, rc = PTR_ERR(di));
832 rc = iops->load(env, di, 0);
834 rc = iops->next(env, di);
842 key = iops->key(env, di);
843 memcpy(&buf[size], key, libu->libu_keysize);
844 size += libu->libu_keysize;
846 rc = iops->rec(env, di, rec, 0);
850 size += libu->libu_recsize;
852 if (size + libu->libu_keysize + libu->libu_recsize > bufsize) {
853 rc = lustre_index_backup_body(env, bak_obj, &pos,
861 rc = iops->next(env, di);
864 if (rc >= 0 && size > 0)
865 rc = lustre_index_backup_body(env, bak_obj, &pos, buf, size);
870 rc = lustre_index_backup_header(env, bak_obj, &libu->libu_fid,
871 libu->libu_keysize, libu->libu_recsize,
872 buf, bufsize, count);
874 rc = lustre_index_update_lma(env, tgt_obj, buf, bufsize);
876 if (!rc && OBD_FAIL_CHECK(OBD_FAIL_OSD_INDEX_CRASH)) {
877 LASSERT(bufsize >= 512);
881 lustre_index_backup_body(env, tgt_obj, &pos, buf, 512);
889 if (!IS_ERR_OR_NULL(tgt_obj))
890 dt_object_put_nocache(env, tgt_obj);
891 if (!IS_ERR_OR_NULL(bak_obj))
892 dt_object_put_nocache(env, bak_obj);
896 void lustre_index_backup(const struct lu_env *env, struct dt_device *dev,
897 const char *devname, struct list_head *head,
898 spinlock_t *lock, int *guard, bool backup)
900 struct lustre_index_backup_unit *libu;
901 struct local_oid_storage *los = NULL;
902 struct dt_object *parent = NULL;
908 if (dev->dd_rdonly || *guard)
915 if (list_empty(head))
918 /* Handle kinds of failures during mount process. */
919 if (!dev->dd_lu_dev.ld_site || !dev->dd_lu_dev.ld_site->ls_top_dev)
923 OBD_ALLOC_LARGE(buf, INDEX_BACKUP_BUFSIZE);
929 lu_local_obj_fid(&fid, INDEX_BACKUP_OID);
930 parent = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
932 if (IS_ERR_OR_NULL(parent)) {
933 CERROR("%s: failed to locate backup dir: rc = %ld\n",
934 devname, parent ? PTR_ERR(parent) : -ENOENT);
939 lu_local_name_obj_fid(&fid, 1);
940 rc = local_oid_storage_init(env, dev, &fid, &los);
942 CERROR("%s: failed to init local storage: rc = %d\n",
950 while (!list_empty(head)) {
951 libu = list_entry(head->next,
952 struct lustre_index_backup_unit, libu_link);
953 list_del_init(&libu->libu_link);
957 rc = lustre_index_backup_one(env, los, parent, libu,
958 buf, INDEX_BACKUP_BUFSIZE);
959 CDEBUG(D_WARNING, "%s: backup index "DFID": rc = %d\n",
960 devname, PFID(&libu->libu_fid), rc);
969 local_oid_storage_fini(env, los);
971 dt_object_put_nocache(env, parent);
973 OBD_FREE_LARGE(buf, INDEX_BACKUP_BUFSIZE);
977 EXPORT_SYMBOL(lustre_index_backup);
979 int lustre_index_restore(const struct lu_env *env, struct dt_device *dev,
980 const struct lu_fid *parent_fid,
981 const struct lu_fid *tgt_fid,
982 const struct lu_fid *bak_fid, const char *name,
983 struct list_head *head, spinlock_t *lock,
984 char *buf, int bufsize)
986 struct dt_object *parent_obj = NULL;
987 struct dt_object *tgt_obj = NULL;
988 struct dt_object *bak_obj = NULL;
989 struct lustre_index_backup_header *header;
990 struct dt_index_features *feat;
991 struct dt_object_format *dof;
994 struct lu_object_conf conf;
995 struct dt_insert_rec ent;
1004 bool registered = false;
1007 LASSERT(bufsize >= sizeof(*la) + sizeof(*dof) +
1008 sizeof(*feat) + sizeof(*header));
1010 memset(buf, 0, bufsize);
1011 la = (struct lu_attr *)buf;
1012 dof = (void *)la + sizeof(*la);
1013 feat = (void *)dof + sizeof(*dof);
1014 header = (void *)feat + sizeof(*feat);
1015 lbuf.lb_buf = header;
1016 lbuf.lb_len = sizeof(*header);
1018 tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1020 if (IS_ERR_OR_NULL(tgt_obj))
1021 GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
1023 bak_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1025 if (IS_ERR_OR_NULL(bak_obj))
1026 GOTO(out, rc = bak_obj ? PTR_ERR(bak_obj) : -ENOENT);
1028 if (!dt_object_exists(bak_obj))
1029 GOTO(out, rc = -ENOENT);
1031 parent_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1033 if (IS_ERR_OR_NULL(parent_obj))
1034 GOTO(out, rc = parent_obj ? PTR_ERR(parent_obj) : -ENOENT);
1036 LASSERT(dt_object_exists(parent_obj));
1038 if (unlikely(!dt_try_as_dir(env, parent_obj)))
1039 GOTO(out, rc = -ENOTDIR);
1041 rc = dt_attr_get(env, tgt_obj, la);
1045 rc = dt_record_read(env, bak_obj, &lbuf, &pos);
1049 if (le32_to_cpu(header->libh_magic) != INDEX_BACKUP_MAGIC_V1)
1050 GOTO(out, rc = -EINVAL);
1052 fid_le_to_cpu(&tfid, &header->libh_owner);
1053 if (unlikely(!lu_fid_eq(tgt_fid, &tfid)))
1054 GOTO(out, rc = -EINVAL);
1056 keysize = le32_to_cpu(header->libh_keysize);
1057 recsize = le32_to_cpu(header->libh_recsize);
1058 pairsize = keysize + recsize;
1060 memset(feat, 0, sizeof(*feat));
1061 feat->dif_flags = DT_IND_UPDATE;
1062 feat->dif_keysize_min = feat->dif_keysize_max = keysize;
1063 feat->dif_recsize_min = feat->dif_recsize_max = recsize;
1064 feat->dif_ptrsize = 4;
1066 /* T1: remove old name entry and destroy old index. */
1067 th = dt_trans_create(env, dev);
1069 GOTO(out, rc = PTR_ERR(th));
1071 rc = dt_declare_delete(env, parent_obj,
1072 (const struct dt_key *)name, th);
1076 rc = dt_declare_destroy(env, tgt_obj, th);
1080 rc = dt_trans_start_local(env, dev, th);
1084 rc = dt_delete(env, parent_obj, (const struct dt_key *)name, th);
1088 dt_write_lock(env, tgt_obj, 0);
1089 rc = dt_destroy(env, tgt_obj, th);
1090 dt_write_unlock(env, tgt_obj);
1091 dt_trans_stop(env, dev, th);
1095 la->la_valid = LA_MODE | LA_UID | LA_GID;
1096 conf.loc_flags = LOC_F_NEW;
1097 dof->u.dof_idx.di_feat = feat;
1098 dof->dof_type = DFT_INDEX;
1099 ent.rec_type = S_IFREG;
1100 ent.rec_fid = tgt_fid;
1102 /* Drop cache before re-create it. */
1103 dt_object_put_nocache(env, tgt_obj);
1104 tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
1106 if (IS_ERR_OR_NULL(tgt_obj))
1107 GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
1109 LASSERT(!dt_object_exists(tgt_obj));
1111 /* T2: create new index and insert new name entry. */
1112 th = dt_trans_create(env, dev);
1114 GOTO(out, rc = PTR_ERR(th));
1116 rc = dt_declare_create(env, tgt_obj, la, NULL, dof, th);
1120 rc = dt_declare_insert(env, parent_obj, (const struct dt_rec *)&ent,
1121 (const struct dt_key *)name, th);
1125 rc = dt_trans_start_local(env, dev, th);
1129 dt_write_lock(env, tgt_obj, 0);
1130 rc = dt_create(env, tgt_obj, la, NULL, dof, th);
1131 dt_write_unlock(env, tgt_obj);
1135 rc = dt_insert(env, parent_obj, (const struct dt_rec *)&ent,
1136 (const struct dt_key *)name, th, 1);
1137 dt_trans_stop(env, dev, th);
1138 /* Some index name may has been inserted by OSD
1139 * automatically when create the index object. */
1140 if (unlikely(rc == -EEXIST))
1145 /* The new index will register via index_try. */
1146 rc = tgt_obj->do_ops->do_index_try(env, tgt_obj, feat);
1151 count = le32_to_cpu(header->libh_count);
1152 while (!rc && count > 0) {
1153 int size = pairsize * count;
1157 if (size > bufsize) {
1158 items = bufsize / pairsize;
1159 size = pairsize * items;
1164 rc = dt_record_read(env, bak_obj, &lbuf, &pos);
1165 for (i = 0; i < items && !rc; i++) {
1166 void *key = &buf[i * pairsize];
1167 void *rec = &buf[i * pairsize + keysize];
1169 /* Tn: restore the records. */
1170 th = dt_trans_create(env, dev);
1172 GOTO(out, rc = -ENOMEM);
1174 rc = dt_declare_insert(env, tgt_obj, rec, key, th);
1178 rc = dt_trans_start_local(env, dev, th);
1182 rc = dt_insert(env, tgt_obj, rec, key, th, 1);
1183 if (unlikely(rc == -EEXIST))
1186 dt_trans_stop(env, dev, th);
1195 dt_trans_stop(env, dev, th);
1196 if (rc && registered)
1197 /* Degister the index to avoid overwriting the backup. */
1198 lustre_index_degister(head, lock, tgt_fid);
1201 if (!IS_ERR_OR_NULL(tgt_obj))
1202 dt_object_put_nocache(env, tgt_obj);
1203 if (!IS_ERR_OR_NULL(bak_obj))
1204 dt_object_put_nocache(env, bak_obj);
1205 if (!IS_ERR_OR_NULL(parent_obj))
1206 dt_object_put_nocache(env, parent_obj);
1209 EXPORT_SYMBOL(lustre_index_restore);