OSD_LPF_OID = 19UL,
REPLY_DATA_OID = 21UL,
ACCT_PROJECT_OID = 22UL,
+ INDEX_BACKUP_OID = 4116UL,
OFD_LAST_GROUP_OID = 4117UL,
LLOG_CATALOGS_OID = 4118UL,
MGS_CONFIGS_OID = 4119UL,
OLF_SHOW_NAME = 0x0004,
OLF_NO_OI = 0x0008,
OLF_IDX_IN_FID = 0x0010,
+ OLF_NOT_BACKUP = 0x0020,
};
/* There are some overhead to detect OI inconsistency automatically
os_full_scrub:1;
};
+#define INDEX_BACKUP_MAGIC_V1 0x1E41F208
+#define INDEX_BACKUP_BUFSIZE (4096 * 4)
+
+enum lustre_index_backup_policy {
+ /* By default, do not backup the index */
+ LIBP_NONE = 0,
+
+ /* Backup the dirty index objects when umount */
+ LIBP_AUTO = 1,
+};
+
+struct lustre_index_backup_header {
+ __u32 libh_magic;
+ __u32 libh_count;
+ __u32 libh_keysize;
+ __u32 libh_recsize;
+ struct lu_fid libh_owner;
+ __u64 libh_pad[60]; /* keep header 512 bytes aligned */
+};
+
+struct lustre_index_backup_unit {
+ struct list_head libu_link;
+ struct lu_fid libu_fid;
+ __u32 libu_keysize;
+ __u32 libu_recsize;
+};
+
+struct lustre_index_restore_unit {
+ struct list_head liru_link;
+ struct lu_fid liru_pfid;
+ struct lu_fid liru_cfid;
+ __u64 liru_clid;
+ int liru_len;
+ char liru_name[0];
+};
+
void scrub_file_init(struct lustre_scrub *scrub, __u8 *uuid);
void scrub_file_reset(struct lustre_scrub *scrub, __u8 *uuid, __u64 flags);
int scrub_file_load(const struct lu_env *env, struct lustre_scrub *scrub);
void scrub_stop(struct lustre_scrub *scrub);
void scrub_dump(struct seq_file *m, struct lustre_scrub *scrub);
+int lustre_liru_new(struct list_head *head, const struct lu_fid *pfid,
+ const struct lu_fid *cfid, __u64 child,
+ const char *name, int namelen);
+
+int lustre_index_register(struct dt_device *dev, const char *devname,
+ struct list_head *head, spinlock_t *lock, int *guard,
+ const struct lu_fid *fid,
+ __u32 keysize, __u32 recsize);
+
+void lustre_index_backup(const struct lu_env *env, struct dt_device *dev,
+ const char *devname, struct list_head *head,
+ spinlock_t *lock, int *guard, bool backup);
+int lustre_index_restore(const struct lu_env *env, struct dt_device *dev,
+ const struct lu_fid *parent_fid,
+ const struct lu_fid *tgt_fid,
+ const struct lu_fid *bak_fid, const char *name,
+ struct list_head *head, spinlock_t *lock,
+ char *buf, int bufsize);
+
+static inline void lustre_fid2lbx(char *buf, const struct lu_fid *fid, int len)
+{
+ snprintf(buf, len, DFID_NOBRACE".lbx", PFID(fid));
+}
+
static inline const char *osd_scrub2name(struct lustre_scrub *scrub)
{
return scrub->os_name;
#define OBD_FAIL_OSD_COMPAT_NO_ENTRY 0x196
#define OBD_FAIL_OSD_OST_EA_FID_SET 0x197
#define OBD_FAIL_OSD_NO_OI_ENTRY 0x198
+#define OBD_FAIL_OSD_INDEX_CRASH 0x199
#define OBD_FAIL_OST 0x200
#define OBD_FAIL_OST_CONNECT_NET 0x201
#define LFSCK_LAYOUT "lfsck_layout"
#define LFSCK_NAMESPACE "lfsck_namespace"
#define REMOTE_PARENT_DIR "REMOTE_PARENT_DIR"
+#define INDEX_BACKUP_DIR "index_backup"
/****************** persistent mount data *********************/
* under /O/<seq>/d<x>. */
LMAC_STRIPE_INFO = 0x00000010, /* stripe info in the LMA EA. */
LMAC_COMP_INFO = 0x00000020, /* Component info in the LMA EA. */
+ LMAC_IDX_BACKUP = 0x00000040, /* Has index backup. */
};
/**
int mgs_fs_cleanup(const struct lu_env *env, struct mgs_device *mgs)
{
+ struct lustre_cfg_bufs bufs;
+ struct lustre_cfg *lcfg;
+
+ /* For the MGS on independent device from MDT, it notifies the lower
+ * layer OSD to backup index before the umount via LCFG_PRE_CLEANUP. */
+ lustre_cfg_bufs_reset(&bufs, mgs->mgs_obd->obd_name);
+ lustre_cfg_bufs_set_string(&bufs, 1, NULL);
+ OBD_ALLOC(lcfg, lustre_cfg_len(bufs.lcfg_bufcount, bufs.lcfg_buflen));
+ if (!lcfg) {
+ CERROR("%s: failed to trigger LCFG_PRE_CLEANUP\n",
+ mgs->mgs_obd->obd_name);
+ } else {
+ struct lu_device *l = &mgs->mgs_bottom->dd_lu_dev;
+
+ lustre_cfg_init(lcfg, LCFG_PRE_CLEANUP, &bufs);
+ l->ld_ops->ldo_process_config(env, l, lcfg);
+ OBD_FREE(lcfg, lustre_cfg_len(lcfg->lcfg_bufcount,
+ lcfg->lcfg_buflens));
+ }
+
if (mgs->mgs_configs_dir) {
dt_object_put(env, mgs->mgs_configs_dir);
mgs->mgs_configs_dir = NULL;
#include <linux/kthread.h>
#include <lustre_scrub.h>
#include <lustre_lib.h>
+#include <lustre_fid.h>
static inline struct dt_device *scrub_obj2dev(struct dt_object *obj)
{
up_read(&scrub->os_rwsem);
}
EXPORT_SYMBOL(scrub_dump);
+
+int lustre_liru_new(struct list_head *head, const struct lu_fid *pfid,
+ const struct lu_fid *cfid, __u64 child,
+ const char *name, int namelen)
+{
+ struct lustre_index_restore_unit *liru;
+ int len = sizeof(*liru) + namelen + 1;
+
+ OBD_ALLOC(liru, len);
+ if (!liru)
+ return -ENOMEM;
+
+ INIT_LIST_HEAD(&liru->liru_link);
+ liru->liru_pfid = *pfid;
+ liru->liru_cfid = *cfid;
+ liru->liru_clid = child;
+ liru->liru_len = len;
+ memcpy(liru->liru_name, name, namelen);
+ liru->liru_name[namelen] = 0;
+ list_add_tail(&liru->liru_link, head);
+
+ return 0;
+}
+EXPORT_SYMBOL(lustre_liru_new);
+
+int lustre_index_register(struct dt_device *dev, const char *devname,
+ struct list_head *head, spinlock_t *lock, int *guard,
+ const struct lu_fid *fid,
+ __u32 keysize, __u32 recsize)
+{
+ struct lustre_index_backup_unit *libu, *pos;
+ int rc = 0;
+ ENTRY;
+
+ if (dev->dd_rdonly || *guard)
+ RETURN(1);
+
+ OBD_ALLOC_PTR(libu);
+ if (!libu)
+ RETURN(-ENOMEM);
+
+ INIT_LIST_HEAD(&libu->libu_link);
+ libu->libu_keysize = keysize;
+ libu->libu_recsize = recsize;
+ libu->libu_fid = *fid;
+
+ spin_lock(lock);
+ if (unlikely(*guard)) {
+ spin_unlock(lock);
+ OBD_FREE_PTR(libu);
+
+ RETURN(1);
+ }
+
+ list_for_each_entry_reverse(pos, head, libu_link) {
+ rc = lu_fid_cmp(&pos->libu_fid, fid);
+ if (rc < 0) {
+ list_add(&libu->libu_link, &pos->libu_link);
+ spin_unlock(lock);
+
+ RETURN(0);
+ }
+
+ if (!rc) {
+ /* Registered already. But the former registered one
+ * has different keysize/recsize. It may because that
+ * the former values are from disk and corrupted, then
+ * replace it with new values. */
+ if (unlikely(keysize != pos->libu_keysize ||
+ recsize != pos->libu_recsize)) {
+ CWARN("%s: the index "DFID" has registered "
+ "with %u/%u, may be invalid, replace "
+ "with %u/%u\n",
+ devname, PFID(fid), pos->libu_keysize,
+ pos->libu_recsize, keysize, recsize);
+
+ pos->libu_keysize = keysize;
+ pos->libu_recsize = recsize;
+ } else {
+ rc = 1;
+ }
+
+ spin_unlock(lock);
+ OBD_FREE_PTR(libu);
+
+ RETURN(rc);
+ }
+ }
+
+ list_add(&libu->libu_link, head);
+ spin_unlock(lock);
+
+ RETURN(0);
+}
+EXPORT_SYMBOL(lustre_index_register);
+
+static void lustre_index_degister(struct list_head *head, spinlock_t *lock,
+ const struct lu_fid *fid)
+{
+ struct lustre_index_backup_unit *libu;
+ int rc = -ENOENT;
+
+ spin_lock(lock);
+ list_for_each_entry_reverse(libu, head, libu_link) {
+ rc = lu_fid_cmp(&libu->libu_fid, fid);
+ /* NOT registered. */
+ if (rc < 0)
+ break;
+
+ if (!rc) {
+ list_del(&libu->libu_link);
+ break;
+ }
+ }
+ spin_unlock(lock);
+
+ if (!rc)
+ OBD_FREE_PTR(libu);
+}
+
+static void
+lustre_index_backup_make_header(struct lustre_index_backup_header *header,
+ __u32 keysize, __u32 recsize,
+ const struct lu_fid *fid, __u32 count)
+{
+ memset(header, 0, sizeof(*header));
+ header->libh_magic = cpu_to_le32(INDEX_BACKUP_MAGIC_V1);
+ header->libh_count = cpu_to_le32(count);
+ header->libh_keysize = cpu_to_le32(keysize);
+ header->libh_recsize = cpu_to_le32(recsize);
+ fid_cpu_to_le(&header->libh_owner, fid);
+}
+
+static int lustre_index_backup_body(const struct lu_env *env,
+ struct dt_object *obj, loff_t *pos,
+ void *buf, int bufsize)
+{
+ struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
+ struct thandle *th;
+ struct lu_buf lbuf = {
+ .lb_buf = buf,
+ .lb_len = bufsize
+ };
+ int rc;
+ ENTRY;
+
+ th = dt_trans_create(env, dev);
+ if (IS_ERR(th))
+ RETURN(PTR_ERR(th));
+
+ rc = dt_declare_record_write(env, obj, &lbuf, *pos, th);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = dt_trans_start_local(env, dev, th);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = dt_record_write(env, obj, &lbuf, pos, th);
+
+ GOTO(stop, rc);
+
+stop:
+ dt_trans_stop(env, dev, th);
+ return rc;
+}
+
+static int lustre_index_backup_header(const struct lu_env *env,
+ struct dt_object *obj,
+ const struct lu_fid *tgt_fid,
+ __u32 keysize, __u32 recsize,
+ void *buf, int bufsize, int count)
+{
+ struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
+ struct lustre_index_backup_header *header = buf;
+ struct lu_attr *la = buf;
+ struct thandle *th;
+ struct lu_buf lbuf = {
+ .lb_buf = header,
+ .lb_len = sizeof(*header)
+ };
+ loff_t size = sizeof(*header) + (keysize + recsize) * count;
+ loff_t pos = 0;
+ int rc;
+ bool punch = false;
+ ENTRY;
+
+ LASSERT(sizeof(*la) <= bufsize);
+ LASSERT(sizeof(*header) <= bufsize);
+
+ rc = dt_attr_get(env, obj, la);
+ if (rc)
+ RETURN(rc);
+
+ if (la->la_size > size)
+ punch = true;
+
+ lustre_index_backup_make_header(header, keysize, recsize,
+ tgt_fid, count);
+ th = dt_trans_create(env, dev);
+ if (IS_ERR(th))
+ RETURN(PTR_ERR(th));
+
+ rc = dt_declare_record_write(env, obj, &lbuf, pos, th);
+ if (rc)
+ GOTO(stop, rc);
+
+ if (punch) {
+ rc = dt_declare_punch(env, obj, size, OBD_OBJECT_EOF, th);
+ if (rc)
+ GOTO(stop, rc);
+ }
+
+ rc = dt_trans_start_local(env, dev, th);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = dt_record_write(env, obj, &lbuf, &pos, th);
+ if (!rc && punch)
+ rc = dt_punch(env, obj, size, OBD_OBJECT_EOF, th);
+
+ GOTO(stop, rc);
+
+stop:
+ dt_trans_stop(env, dev, th);
+ return rc;
+}
+
+static int lustre_index_update_lma(const struct lu_env *env,
+ struct dt_object *obj,
+ void *buf, int bufsize)
+{
+ struct dt_device *dev = lu2dt_dev(obj->do_lu.lo_dev);
+ struct lustre_mdt_attrs *lma = buf;
+ struct lu_buf lbuf = {
+ .lb_buf = lma,
+ .lb_len = sizeof(struct lustre_ost_attrs)
+ };
+ struct thandle *th;
+ int fl = LU_XATTR_REPLACE;
+ int rc;
+ ENTRY;
+
+ LASSERT(bufsize >= lbuf.lb_len);
+
+ rc = dt_xattr_get(env, obj, &lbuf, XATTR_NAME_LMA);
+ if (unlikely(rc == -ENODATA)) {
+ fl = LU_XATTR_CREATE;
+ lustre_lma_init(lma, lu_object_fid(&obj->do_lu),
+ LMAC_IDX_BACKUP, 0);
+ rc = sizeof(*lma);
+ } else if (rc < sizeof(*lma)) {
+ RETURN(rc < 0 ? rc : -EFAULT);
+ } else {
+ lustre_lma_swab(lma);
+ if (lma->lma_compat & LMAC_IDX_BACKUP)
+ RETURN(0);
+
+ lma->lma_compat |= LMAC_IDX_BACKUP;
+ }
+
+ lustre_lma_swab(lma);
+ lbuf.lb_len = rc;
+ th = dt_trans_create(env, dev);
+ if (IS_ERR(th))
+ RETURN(rc);
+
+ rc = dt_declare_xattr_set(env, obj, &lbuf, XATTR_NAME_LMA, fl, th);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = dt_trans_start_local(env, dev, th);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = dt_xattr_set(env, obj, &lbuf, XATTR_NAME_LMA, fl, th);
+
+ GOTO(stop, rc);
+
+stop:
+ dt_trans_stop(env, dev, th);
+ return rc;
+}
+
+static int lustre_index_backup_one(const struct lu_env *env,
+ struct local_oid_storage *los,
+ struct dt_object *parent,
+ struct lustre_index_backup_unit *libu,
+ char *buf, int bufsize)
+{
+ struct dt_device *dev = scrub_obj2dev(parent);
+ struct dt_object *tgt_obj = NULL;
+ struct dt_object *bak_obj = NULL;
+ const struct dt_it_ops *iops;
+ struct dt_it *di;
+ loff_t pos = sizeof(struct lustre_index_backup_header);
+ int count = 0;
+ int size = 0;
+ int rc;
+ ENTRY;
+
+ tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
+ &libu->libu_fid, NULL));
+ if (IS_ERR_OR_NULL(tgt_obj))
+ GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
+
+ if (!dt_object_exists(tgt_obj))
+ GOTO(out, rc = 0);
+
+ if (!tgt_obj->do_index_ops) {
+ struct dt_index_features feat;
+
+ feat.dif_flags = DT_IND_UPDATE;
+ feat.dif_keysize_min = libu->libu_keysize;
+ feat.dif_keysize_max = libu->libu_keysize;
+ feat.dif_recsize_min = libu->libu_recsize;
+ feat.dif_recsize_max = libu->libu_recsize;
+ feat.dif_ptrsize = 4;
+ rc = tgt_obj->do_ops->do_index_try(env, tgt_obj, &feat);
+ if (rc)
+ GOTO(out, rc);
+ }
+
+ lustre_fid2lbx(buf, &libu->libu_fid, bufsize);
+ bak_obj = local_file_find_or_create(env, los, parent, buf,
+ S_IFREG | S_IRUGO | S_IWUSR);
+ if (IS_ERR_OR_NULL(bak_obj))
+ GOTO(out, rc = bak_obj ? PTR_ERR(bak_obj) : -ENOENT);
+
+ iops = &tgt_obj->do_index_ops->dio_it;
+ di = iops->init(env, tgt_obj, 0);
+ if (IS_ERR(di))
+ GOTO(out, rc = PTR_ERR(di));
+
+ rc = iops->load(env, di, 0);
+ if (!rc)
+ rc = iops->next(env, di);
+ else if (rc > 0)
+ rc = 0;
+
+ while (!rc) {
+ void *key;
+ void *rec;
+
+ key = iops->key(env, di);
+ memcpy(&buf[size], key, libu->libu_keysize);
+ size += libu->libu_keysize;
+ rec = &buf[size];
+ rc = iops->rec(env, di, rec, 0);
+ if (rc)
+ GOTO(fini, rc);
+
+ size += libu->libu_recsize;
+ count++;
+ if (size + libu->libu_keysize + libu->libu_recsize > bufsize) {
+ rc = lustre_index_backup_body(env, bak_obj, &pos,
+ buf, size);
+ if (rc)
+ GOTO(fini, rc);
+
+ size = 0;
+ }
+
+ rc = iops->next(env, di);
+ }
+
+ if (rc >= 0 && size > 0)
+ rc = lustre_index_backup_body(env, bak_obj, &pos, buf, size);
+
+ if (rc < 0)
+ GOTO(fini, rc);
+
+ rc = lustre_index_backup_header(env, bak_obj, &libu->libu_fid,
+ libu->libu_keysize, libu->libu_recsize,
+ buf, bufsize, count);
+ if (!rc)
+ rc = lustre_index_update_lma(env, tgt_obj, buf, bufsize);
+
+ if (!rc && OBD_FAIL_CHECK(OBD_FAIL_OSD_INDEX_CRASH)) {
+ LASSERT(bufsize >= 512);
+
+ pos = 0;
+ memset(buf, 0, 512);
+ lustre_index_backup_body(env, tgt_obj, &pos, buf, 512);
+ }
+
+ GOTO(fini, rc);
+
+fini:
+ iops->fini(env, di);
+out:
+ if (!IS_ERR_OR_NULL(tgt_obj))
+ dt_object_put_nocache(env, tgt_obj);
+ if (!IS_ERR_OR_NULL(bak_obj))
+ dt_object_put_nocache(env, bak_obj);
+ return rc;
+}
+
+void lustre_index_backup(const struct lu_env *env, struct dt_device *dev,
+ const char *devname, struct list_head *head,
+ spinlock_t *lock, int *guard, bool backup)
+{
+ struct lustre_index_backup_unit *libu;
+ struct local_oid_storage *los = NULL;
+ struct dt_object *parent = NULL;
+ char *buf = NULL;
+ struct lu_fid fid;
+ int rc;
+ ENTRY;
+
+ if (dev->dd_rdonly || *guard)
+ RETURN_EXIT;
+
+ spin_lock(lock);
+ *guard = 1;
+ spin_unlock(lock);
+
+ if (list_empty(head))
+ RETURN_EXIT;
+
+ /* Handle kinds of failures during mount process. */
+ if (!dev->dd_lu_dev.ld_site || !dev->dd_lu_dev.ld_site->ls_top_dev)
+ backup = false;
+
+ if (backup) {
+ OBD_ALLOC_LARGE(buf, INDEX_BACKUP_BUFSIZE);
+ if (!buf) {
+ backup = false;
+ goto scan;
+ }
+
+ lu_local_obj_fid(&fid, INDEX_BACKUP_OID);
+ parent = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
+ &fid, NULL));
+ if (IS_ERR_OR_NULL(parent)) {
+ CERROR("%s: failed to locate backup dir: rc = %ld\n",
+ devname, parent ? PTR_ERR(parent) : -ENOENT);
+ backup = false;
+ goto scan;
+ }
+
+ lu_local_name_obj_fid(&fid, 1);
+ rc = local_oid_storage_init(env, dev, &fid, &los);
+ if (rc) {
+ CERROR("%s: failed to init local storage: rc = %d\n",
+ devname, rc);
+ backup = false;
+ }
+ }
+
+scan:
+ spin_lock(lock);
+ while (!list_empty(head)) {
+ libu = list_entry(head->next,
+ struct lustre_index_backup_unit, libu_link);
+ list_del_init(&libu->libu_link);
+ spin_unlock(lock);
+
+ if (backup) {
+ rc = lustre_index_backup_one(env, los, parent, libu,
+ buf, INDEX_BACKUP_BUFSIZE);
+ CDEBUG(D_WARNING, "%s: backup index "DFID": rc = %d\n",
+ devname, PFID(&libu->libu_fid), rc);
+ }
+
+ OBD_FREE_PTR(libu);
+ spin_lock(lock);
+ }
+ spin_unlock(lock);
+
+ if (los)
+ local_oid_storage_fini(env, los);
+ if (parent)
+ dt_object_put_nocache(env, parent);
+ if (buf)
+ OBD_FREE_LARGE(buf, INDEX_BACKUP_BUFSIZE);
+
+ EXIT;
+}
+EXPORT_SYMBOL(lustre_index_backup);
+
+int lustre_index_restore(const struct lu_env *env, struct dt_device *dev,
+ const struct lu_fid *parent_fid,
+ const struct lu_fid *tgt_fid,
+ const struct lu_fid *bak_fid, const char *name,
+ struct list_head *head, spinlock_t *lock,
+ char *buf, int bufsize)
+{
+ struct dt_object *parent_obj = NULL;
+ struct dt_object *tgt_obj = NULL;
+ struct dt_object *bak_obj = NULL;
+ struct lustre_index_backup_header *header;
+ struct dt_index_features *feat;
+ struct dt_object_format *dof;
+ struct lu_attr *la;
+ struct thandle *th;
+ struct lu_object_conf conf;
+ struct dt_insert_rec ent;
+ struct lu_buf lbuf;
+ struct lu_fid tfid;
+ loff_t pos = 0;
+ __u32 keysize;
+ __u32 recsize;
+ __u32 pairsize;
+ int count;
+ int rc;
+ bool registered = false;
+ ENTRY;
+
+ LASSERT(bufsize >= sizeof(*la) + sizeof(*dof) +
+ sizeof(*feat) + sizeof(*header));
+
+ memset(buf, 0, bufsize);
+ la = (struct lu_attr *)buf;
+ dof = (void *)la + sizeof(*la);
+ feat = (void *)dof + sizeof(*dof);
+ header = (void *)feat + sizeof(*feat);
+ lbuf.lb_buf = header;
+ lbuf.lb_len = sizeof(*header);
+
+ tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
+ tgt_fid, NULL));
+ if (IS_ERR_OR_NULL(tgt_obj))
+ GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
+
+ bak_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
+ bak_fid, NULL));
+ if (IS_ERR_OR_NULL(bak_obj))
+ GOTO(out, rc = bak_obj ? PTR_ERR(bak_obj) : -ENOENT);
+
+ if (!dt_object_exists(bak_obj))
+ GOTO(out, rc = -ENOENT);
+
+ parent_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
+ parent_fid, NULL));
+ if (IS_ERR_OR_NULL(parent_obj))
+ GOTO(out, rc = parent_obj ? PTR_ERR(parent_obj) : -ENOENT);
+
+ LASSERT(dt_object_exists(parent_obj));
+
+ if (unlikely(!dt_try_as_dir(env, parent_obj)))
+ GOTO(out, rc = -ENOTDIR);
+
+ rc = dt_attr_get(env, tgt_obj, la);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = dt_record_read(env, bak_obj, &lbuf, &pos);
+ if (rc)
+ GOTO(out, rc);
+
+ if (le32_to_cpu(header->libh_magic) != INDEX_BACKUP_MAGIC_V1)
+ GOTO(out, rc = -EINVAL);
+
+ fid_le_to_cpu(&tfid, &header->libh_owner);
+ if (unlikely(!lu_fid_eq(tgt_fid, &tfid)))
+ GOTO(out, rc = -EINVAL);
+
+ keysize = le32_to_cpu(header->libh_keysize);
+ recsize = le32_to_cpu(header->libh_recsize);
+ pairsize = keysize + recsize;
+
+ memset(feat, 0, sizeof(*feat));
+ feat->dif_flags = DT_IND_UPDATE;
+ feat->dif_keysize_min = feat->dif_keysize_max = keysize;
+ feat->dif_recsize_min = feat->dif_recsize_max = recsize;
+ feat->dif_ptrsize = 4;
+
+ /* T1: remove old name entry and destroy old index. */
+ th = dt_trans_create(env, dev);
+ if (IS_ERR(th))
+ GOTO(out, rc = PTR_ERR(th));
+
+ rc = dt_declare_delete(env, parent_obj,
+ (const struct dt_key *)name, th);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = dt_declare_destroy(env, tgt_obj, th);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = dt_trans_start_local(env, dev, th);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = dt_delete(env, parent_obj, (const struct dt_key *)name, th);
+ if (rc)
+ GOTO(stop, rc);
+
+ dt_write_lock(env, tgt_obj, 0);
+ rc = dt_destroy(env, tgt_obj, th);
+ dt_write_unlock(env, tgt_obj);
+ dt_trans_stop(env, dev, th);
+ if (rc)
+ GOTO(out, rc);
+
+ la->la_valid = LA_MODE | LA_UID | LA_GID;
+ conf.loc_flags = LOC_F_NEW;
+ dof->u.dof_idx.di_feat = feat;
+ dof->dof_type = DFT_INDEX;
+ ent.rec_type = S_IFREG;
+ ent.rec_fid = tgt_fid;
+
+ /* Drop cache before re-create it. */
+ dt_object_put_nocache(env, tgt_obj);
+ tgt_obj = lu2dt(lu_object_find_slice(env, &dev->dd_lu_dev,
+ tgt_fid, &conf));
+ if (IS_ERR_OR_NULL(tgt_obj))
+ GOTO(out, rc = tgt_obj ? PTR_ERR(tgt_obj) : -ENOENT);
+
+ LASSERT(!dt_object_exists(tgt_obj));
+
+ /* T2: create new index and insert new name entry. */
+ th = dt_trans_create(env, dev);
+ if (IS_ERR(th))
+ GOTO(out, rc = PTR_ERR(th));
+
+ rc = dt_declare_create(env, tgt_obj, la, NULL, dof, th);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = dt_declare_insert(env, parent_obj, (const struct dt_rec *)&ent,
+ (const struct dt_key *)name, th);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = dt_trans_start_local(env, dev, th);
+ if (rc)
+ GOTO(stop, rc);
+
+ dt_write_lock(env, tgt_obj, 0);
+ rc = dt_create(env, tgt_obj, la, NULL, dof, th);
+ dt_write_unlock(env, tgt_obj);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = dt_insert(env, parent_obj, (const struct dt_rec *)&ent,
+ (const struct dt_key *)name, th, 1);
+ dt_trans_stop(env, dev, th);
+ /* Some index name may has been inserted by OSD
+ * automatically when create the index object. */
+ if (unlikely(rc == -EEXIST))
+ rc = 0;
+ if (rc)
+ GOTO(out, rc);
+
+ /* The new index will register via index_try. */
+ rc = tgt_obj->do_ops->do_index_try(env, tgt_obj, feat);
+ if (rc)
+ GOTO(out, rc);
+
+ registered = true;
+ count = le32_to_cpu(header->libh_count);
+ while (!rc && count > 0) {
+ int size = pairsize * count;
+ int items = count;
+ int i;
+
+ if (size > bufsize) {
+ items = bufsize / pairsize;
+ size = pairsize * items;
+ }
+
+ lbuf.lb_buf = buf;
+ lbuf.lb_len = size;
+ rc = dt_record_read(env, bak_obj, &lbuf, &pos);
+ for (i = 0; i < items && !rc; i++) {
+ void *key = &buf[i * pairsize];
+ void *rec = &buf[i * pairsize + keysize];
+
+ /* Tn: restore the records. */
+ th = dt_trans_create(env, dev);
+ if (!th)
+ GOTO(out, rc = -ENOMEM);
+
+ rc = dt_declare_insert(env, tgt_obj, rec, key, th);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = dt_trans_start_local(env, dev, th);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = dt_insert(env, tgt_obj, rec, key, th, 1);
+ if (unlikely(rc == -EEXIST))
+ rc = 0;
+
+ dt_trans_stop(env, dev, th);
+ }
+
+ count -= items;
+ }
+
+ GOTO(out, rc);
+
+stop:
+ dt_trans_stop(env, dev, th);
+ if (rc && registered)
+ /* Degister the index to avoid overwriting the backup. */
+ lustre_index_degister(head, lock, tgt_fid);
+
+out:
+ if (!IS_ERR_OR_NULL(tgt_obj))
+ dt_object_put_nocache(env, tgt_obj);
+ if (!IS_ERR_OR_NULL(bak_obj))
+ dt_object_put_nocache(env, bak_obj);
+ if (!IS_ERR_OR_NULL(parent_obj))
+ dt_object_put_nocache(env, parent_obj);
+ return rc;
+}
+EXPORT_SYMBOL(lustre_index_restore);
top->ld_ops->ldo_process_config(env, top, lcfg);
OBD_FREE(lcfg, lustre_cfg_len(lcfg->lcfg_bufcount, lcfg->lcfg_buflens));
+ if (m->ofd_los != NULL) {
+ local_oid_storage_fini(env, m->ofd_los);
+ m->ofd_los = NULL;
+ }
+
lu_site_purge(env, top->ld_site, ~0);
if (!cfs_hash_is_empty(top->ld_site->ls_obj_hash)) {
LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_OTHER, NULL);
EXIT;
}
+static void ofd_stack_pre_fini(const struct lu_env *env, struct ofd_device *m,
+ struct lu_device *top)
+{
+ struct lustre_cfg_bufs bufs;
+ struct lustre_cfg *lcfg;
+ ENTRY;
+
+ LASSERT(top);
+
+ lustre_cfg_bufs_reset(&bufs, ofd_name(m));
+ lustre_cfg_bufs_set_string(&bufs, 1, NULL);
+ OBD_ALLOC(lcfg, lustre_cfg_len(bufs.lcfg_bufcount, bufs.lcfg_buflen));
+ if (!lcfg) {
+ CERROR("%s: failed to trigger LCFG_PRE_CLEANUP\n", ofd_name(m));
+ } else {
+ lustre_cfg_init(lcfg, LCFG_PRE_CLEANUP, &bufs);
+ top->ld_ops->ldo_process_config(env, top, lcfg);
+ OBD_FREE(lcfg, lustre_cfg_len(lcfg->lcfg_bufcount,
+ lcfg->lcfg_buflens));
+ }
+
+ EXIT;
+}
+
/* For interoperability, see mdt_interop_param[]. */
static struct cfg_interop_param ofd_interop_param[] = {
{ "ost.quota_type", NULL },
stop.ls_status = LS_PAUSED;
stop.ls_flags = 0;
lfsck_stop(env, m->ofd_osd, &stop);
+ ofd_stack_pre_fini(env, m, &m->ofd_dt_dev.dd_lu_dev);
target_recovery_fini(obd);
if (m->ofd_namespace != NULL)
ldlm_namespace_free_prior(m->ofd_namespace, NULL,
nm_config_file_deregister_tgt(env, obd->u.obt.obt_nodemap_config_file);
obd->u.obt.obt_nodemap_config_file = NULL;
- if (m->ofd_los != NULL) {
- local_oid_storage_fini(env, m->ofd_los);
- m->ofd_los = NULL;
- }
-
if (m->ofd_namespace != NULL) {
ldlm_namespace_free_post(m->ofd_namespace);
d->ld_obd->obd_namespace = m->ofd_namespace = NULL;
if (rc >= sizeof(o->od_svname))
RETURN(-E2BIG);
+ o->od_index_backup_stop = 0;
o->od_index = -1; /* -1 means index is invalid */
rc = server_name2index(o->od_svname, &o->od_index, NULL);
str = strstr(str, ":");
INIT_LIST_HEAD(&osl->osl_seq_list);
rwlock_init(&osl->osl_seq_list_lock);
sema_init(&osl->osl_seq_init_sem, 1);
+ INIT_LIST_HEAD(&dev->od_index_backup_list);
+ INIT_LIST_HEAD(&dev->od_index_restore_list);
+ spin_lock_init(&dev->od_lock);
+ dev->od_index_backup_policy = LIBP_NONE;
rc = dt_device_init(&dev->od_dt_dev, type);
if (rc == 0) {
rc = osd_mount(env, o, cfg);
break;
case LCFG_CLEANUP:
+ /* For the case LCFG_PRE_CLEANUP is not called in advance,
+ * that may happend if hit failure during mount process. */
+ osd_index_backup(env, o, false);
rc = osd_shutdown(env, o);
break;
case LCFG_PARAM: {
}
break;
}
+ case LCFG_PRE_CLEANUP:
+ osd_index_backup(env, o,
+ o->od_index_backup_policy != LIBP_NONE);
+ rc = 0;
+ break;
default:
rc = -ENOTTY;
}
#include <sys/dsl_prop.h>
#include <sys/sa_impl.h>
#include <sys/txg.h>
-
-static inline int osd_object_is_zap(dnode_t *dn)
-{
- return (dn->dn_type == DMU_OT_DIRECTORY_CONTENTS ||
- dn->dn_type == DMU_OT_USERGROUP_USED);
-}
+#include <lustre_scrub.h>
/* We don't actually have direct access to the zap_hashbits() function
* so just pretend like we do for now. If this ever breaks we can look at
* \retval 0 for success
* \retval negative error number on failure
*/
-static int osd_get_fid_by_oid(const struct lu_env *env, struct osd_device *osd,
- uint64_t oid, struct lu_fid *fid)
+int osd_get_fid_by_oid(const struct lu_env *env, struct osd_device *osd,
+ uint64_t oid, struct lu_fid *fid)
{
struct objset *os = osd->od_os;
struct osd_thread_info *oti = osd_oti_get(env);
const struct dt_index_features *feat)
{
struct osd_object *obj = osd_dt_obj(dt);
+ struct osd_device *osd = osd_obj2dev(obj);
+ const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
int rc = 0;
ENTRY;
else
GOTO(out, rc = -ENOTDIR);
} else if (unlikely(feat == &dt_acct_features)) {
- LASSERT(fid_is_acct(lu_object_fid(&dt->do_lu)));
+ LASSERT(fid_is_acct(fid));
dt->do_index_ops = &osd_acct_index_ops;
} else if (dt->do_index_ops == NULL) {
/* For index file, we don't support variable key & record sizes
obj->oo_recusize = 8;
}
dt->do_index_ops = &osd_index_ops;
+
+ if (feat == &dt_lfsck_layout_orphan_features ||
+ feat == &dt_lfsck_layout_dangling_features ||
+ feat == &dt_lfsck_namespace_features)
+ GOTO(out, rc = 0);
+
+ rc = osd_index_register(osd, fid, obj->oo_keysize,
+ obj->oo_recusize * obj->oo_recsize);
+ if (rc < 0)
+ CWARN("%s: failed to register index "DFID": rc = %d\n",
+ osd_name(osd), PFID(fid), rc);
+ else if (rc > 0)
+ rc = 0;
+ else
+ CDEBUG(D_LFSCK, "%s: index object "DFID
+ " (%u/%u/%u) registered\n",
+ osd_name(osd), PFID(fid), obj->oo_keysize,
+ obj->oo_recusize, obj->oo_recsize);
}
out:
#include <sys/zap.h>
#include <sys/dbuf.h>
#include <sys/dmu_objset.h>
+#include <lustre_scrub.h>
/**
* By design including kmem.h overrides the Linux slab interfaces to provide
struct lu_attr oti_la;
struct osa_attr oti_osa;
zap_attribute_t oti_za;
+ zap_attribute_t oti_za2;
dmu_object_info_t oti_doi;
struct luz_direntry oti_zde;
int oti_ins_cache_used;
struct lu_buf oti_xattr_lbuf;
zap_cursor_t oti_zc;
+ zap_cursor_t oti_zc2;
};
extern struct lu_context_key osd_key;
struct lprocfs_stats *od_stats;
uint64_t od_remote_parent_dir;
+ uint64_t od_index_backup_id;
uint64_t od_max_blksz;
uint64_t od_root;
uint64_t od_O_id;
od_in_init:1,
od_posix_acl:1;
unsigned int od_dnsize;
+ int od_index_backup_stop;
+ enum lustre_index_backup_policy od_index_backup_policy;
char od_mntdev[128];
char od_svname[128];
char od_uuid[16];
struct osd_otable_it *od_otable_it;
struct lustre_scrub od_scrub;
struct list_head od_ios_list;
+ struct list_head od_index_backup_list;
+ struct list_head od_index_restore_list;
+ spinlock_t od_lock;
};
enum osd_destroy_type {
extern unsigned int osd_oi_count;
/* osd_index.c */
+int osd_get_fid_by_oid(const struct lu_env *env, struct osd_device *osd,
+ uint64_t oid, struct lu_fid *fid);
int osd_index_try(const struct lu_env *env, struct dt_object *dt,
const struct dt_index_features *feat);
int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
}
#endif
+static inline int osd_object_is_zap(dnode_t *dn)
+{
+ return (dn->dn_type == DMU_OT_DIRECTORY_CONTENTS ||
+ dn->dn_type == DMU_OT_USERGROUP_USED);
+}
+
/* XXX: f_ver is not counted, but may differ too */
static inline void osd_fid2str(char *buf, const struct lu_fid *fid, int len)
{
dmu_objset_disown((os), (tag))
#endif
+static inline int
+osd_index_register(struct osd_device *osd, const struct lu_fid *fid,
+ __u32 keysize, __u32 recsize)
+{
+ return lustre_index_register(&osd->od_dt_dev, osd_name(osd),
+ &osd->od_index_backup_list, &osd->od_lock,
+ &osd->od_index_backup_stop,
+ fid, keysize, recsize);
+}
+
+static inline void
+osd_index_backup(const struct lu_env *env, struct osd_device *osd, bool backup)
+{
+ struct lu_fid *fid = &osd_oti_get(env)->oti_fid;
+ int rc;
+
+ lu_local_obj_fid(fid, INDEX_BACKUP_OID);
+ rc = osd_idc_find_and_init_with_oid(env, osd, fid,
+ osd->od_index_backup_id);
+ if (rc)
+ backup = false;
+
+ lustre_index_backup(env, &osd->od_dt_dev, osd_name(osd),
+ &osd->od_index_backup_list, &osd->od_lock,
+ &osd->od_index_backup_stop, backup);
+}
+
#endif /* _OSD_INTERNAL_H */
struct osd_device *dev = osd_dt_dev((struct dt_device *)m->private);
LASSERT(dev != NULL);
- if (unlikely(!dev->od_os))
+ if (!dev->od_os)
return -EINPROGRESS;
seq_printf(m, "%lld\n", dev->od_auto_scrub_interval);
__s64 val;
LASSERT(dev != NULL);
- if (unlikely(!dev->od_os))
+ if (!dev->od_os)
return -EINPROGRESS;
rc = lprocfs_str_to_s64(buffer, count, &val);
struct osd_device *dev = osd_dt_dev((struct dt_device *)m->private);
LASSERT(dev != NULL);
- if (unlikely(!dev->od_os))
+ if (!dev->od_os)
return -EINPROGRESS;
scrub_dump(m, &dev->od_scrub);
}
LPROC_SEQ_FOPS_WR_ONLY(zfs, osd_force_sync);
+static int zfs_osd_index_backup_seq_show(struct seq_file *m, void *data)
+{
+ struct osd_device *dev = osd_dt_dev((struct dt_device *)m->private);
+
+ LASSERT(dev != NULL);
+ if (!dev->od_os)
+ return -EINPROGRESS;
+
+ seq_printf(m, "%d\n", dev->od_index_backup_policy);
+ return 0;
+}
+
+static ssize_t zfs_osd_index_backup_seq_write(struct file *file,
+ const char __user *buffer,
+ size_t count, loff_t *off)
+{
+ struct seq_file *m = file->private_data;
+ struct dt_device *dt = m->private;
+ struct osd_device *dev = osd_dt_dev(dt);
+ __s64 val;
+ int rc;
+
+ LASSERT(dev != NULL);
+ if (!dev->od_os)
+ return -EINPROGRESS;
+
+ rc = lprocfs_str_to_s64(buffer, count, &val);
+ if (rc)
+ return rc;
+
+ dev->od_index_backup_policy = val;
+ return count;
+}
+LPROC_SEQ_FOPS(zfs_osd_index_backup);
+
LPROC_SEQ_FOPS_RO_TYPE(zfs, dt_blksize);
LPROC_SEQ_FOPS_RO_TYPE(zfs, dt_kbytestotal);
LPROC_SEQ_FOPS_RO_TYPE(zfs, dt_kbytesfree);
.fops = &zfs_osd_mntdev_fops },
{ .name = "force_sync",
.fops = &zfs_osd_force_sync_fops },
+ { .name = "index_backup",
+ .fops = &zfs_osd_index_backup_fops },
{ 0 }
};
zapid = osd_get_name_n_idx(env, osd, fid, buf,
sizeof(info->oti_str), &zdn);
- if (!CFS_FAIL_CHECK(OBD_FAIL_OSD_NO_OI_ENTRY)) {
- if (osd->od_is_ost &&
- OBD_FAIL_CHECK(OBD_FAIL_OSD_COMPAT_INVALID_ENTRY))
- zde->zde_dnode++;
-
- if (!osd->od_is_ost ||
- !OBD_FAIL_CHECK(OBD_FAIL_OSD_COMPAT_NO_ENTRY)) {
- rc = osd_zap_add(osd, zapid, zdn, buf, 8, 1,
- zde, oh->ot_tx);
- if (rc)
- GOTO(out, rc);
- }
- }
+ if (CFS_FAIL_CHECK(OBD_FAIL_OSD_NO_OI_ENTRY) ||
+ (osd->od_is_ost && OBD_FAIL_CHECK(OBD_FAIL_OSD_COMPAT_NO_ENTRY)))
+ goto skip_add;
+
+ if (osd->od_is_ost && OBD_FAIL_CHECK(OBD_FAIL_OSD_COMPAT_INVALID_ENTRY))
+ zde->zde_dnode++;
+
+ rc = osd_zap_add(osd, zapid, zdn, buf, 8, 1, zde, oh->ot_tx);
+ if (rc)
+ GOTO(out, rc);
+skip_add:
obj->oo_dn = dn;
/* Now add in all of the "SA" attributes */
rc = osd_sa_handle_get(obj);
* power of two and this is checked for basic sanity.
*/
for (count = 0; count < max; count++) {
- snprintf(name, 15, "%s.%d", DMU_OSD_OI_NAME_BASE, count);
+ snprintf(name, sizeof(name) - 1, "%s.%d",
+ DMU_OSD_OI_NAME_BASE, count);
rc = osd_oi_lookup(env, o, o->od_root, name, &oi);
if (!rc)
continue;
RETURN(rc);
}
+static int
+osd_oi_init_index_backup(const struct lu_env *env, struct osd_device *o)
+{
+ struct lu_fid *fid = &osd_oti_get(env)->oti_fid;
+ int rc;
+ ENTRY;
+
+ lu_local_obj_fid(fid, INDEX_BACKUP_OID);
+ rc = osd_obj_find_or_create(env, o, o->od_root, INDEX_BACKUP_DIR,
+ &o->od_index_backup_id, fid, true);
+
+ RETURN(rc);
+}
+
static void
osd_oi_init_remote_parent(const struct lu_env *env, struct osd_device *o)
{
LASSERTF((sf->sf_oi_count & (sf->sf_oi_count - 1)) == 0,
"Invalid OI count in scrub file %d\n", sf->sf_oi_count);
+ rc = osd_oi_init_index_backup(env, o);
+ if (rc)
+ RETURN(rc);
+
osd_oi_init_remote_parent(env, o);
rc = osd_oi_init_compat(env, o);
#include <obd_class.h>
#include <lustre_nodemap.h>
#include <sys/dsl_dataset.h>
+#include <sys/zap_impl.h>
+#include <sys/zap.h>
+#include <sys/zap_leaf.h>
#include "osd_internal.h"
/* LFSCK */
{
.olm_name = LFSCK_DIR,
- .olm_flags = OLF_SCAN_SUBITEMS,
+ .olm_flags = OLF_SCAN_SUBITEMS | OLF_NOT_BACKUP,
.olm_scan_dir = osd_ios_general_sd,
.olm_handle_dirent = osd_ios_varfid_hd,
},
.olm_name = LUSTRE_NODEMAP_NAME,
},
+ /* index_backup */
+ {
+ .olm_name = INDEX_BACKUP_DIR,
+ .olm_fid = {
+ .f_seq = FID_SEQ_LOCAL_FILE,
+ .f_oid = INDEX_BACKUP_OID,
+ },
+ .olm_flags = OLF_SCAN_SUBITEMS | OLF_NOT_BACKUP,
+ .olm_scan_dir = osd_ios_general_sd,
+ .olm_handle_dirent = osd_ios_varfid_hd,
+ },
+
{
.olm_name = NULL
}
return 0;
}
+static bool osd_index_need_recreate(const struct lu_env *env,
+ struct osd_device *dev, uint64_t oid)
+{
+ struct osd_thread_info *info = osd_oti_get(env);
+ zap_attribute_t *za = &info->oti_za2;
+ zap_cursor_t *zc = &info->oti_zc2;
+ int rc;
+ ENTRY;
+
+ zap_cursor_init_serialized(zc, dev->od_os, oid, 0);
+ rc = -zap_cursor_retrieve(zc, za);
+ zap_cursor_fini(zc);
+ if (rc && rc != -ENOENT)
+ RETURN(true);
+
+ RETURN(false);
+}
+
+static void osd_ios_index_register(const struct lu_env *env,
+ struct osd_device *osd,
+ const struct lu_fid *fid, uint64_t oid)
+{
+ struct osd_thread_info *info = osd_oti_get(env);
+ zap_attribute_t *za = &info->oti_za2;
+ zap_cursor_t *zc = &info->oti_zc2;
+ struct zap_leaf_entry *le;
+ dnode_t *dn = NULL;
+ sa_handle_t *hdl;
+ __u64 mode = 0;
+ __u32 keysize = 0;
+ __u32 recsize = 0;
+ int rc;
+ ENTRY;
+
+ rc = __osd_obj2dnode(osd->od_os, oid, &dn);
+ if (rc == -EEXIST || rc == -ENOENT)
+ RETURN_EXIT;
+
+ if (rc < 0)
+ GOTO(log, rc);
+
+ if (!osd_object_is_zap(dn))
+ GOTO(log, rc = 1);
+
+ rc = -sa_handle_get(osd->od_os, oid, NULL, SA_HDL_PRIVATE, &hdl);
+ if (rc)
+ GOTO(log, rc);
+
+ rc = -sa_lookup(hdl, SA_ZPL_MODE(osd), &mode, sizeof(mode));
+ sa_handle_destroy(hdl);
+ if (rc)
+ GOTO(log, rc);
+
+ if (!S_ISREG(mode))
+ GOTO(log, rc = 1);
+
+ zap_cursor_init_serialized(zc, osd->od_os, oid, 0);
+ rc = -zap_cursor_retrieve(zc, za);
+ if (rc)
+ /* Skip empty index object */
+ GOTO(fini, rc = (rc == -ENOENT ? 1 : rc));
+
+ if (zc->zc_zap->zap_ismicro ||
+ !(zap_f_phys(zc->zc_zap)->zap_flags & ZAP_FLAG_UINT64_KEY))
+ GOTO(fini, rc = 1);
+
+ le = ZAP_LEAF_ENTRY(zc->zc_leaf, 0);
+ keysize = le->le_name_numints * 8;
+ recsize = za->za_integer_length * za->za_num_integers;
+ if (likely(keysize && recsize))
+ rc = osd_index_register(osd, fid, keysize, recsize);
+
+ GOTO(fini, rc);
+
+fini:
+ zap_cursor_fini(zc);
+
+log:
+ if (dn)
+ osd_dnode_rele(dn);
+ if (rc < 0)
+ CWARN("%s: failed to register index "DFID" (%u/%u): rc = %d\n",
+ osd_name(osd), PFID(fid), keysize, recsize, rc);
+ else if (!rc)
+ CDEBUG(D_LFSCK, "%s: registered index "DFID" (%u/%u)\n",
+ osd_name(osd), PFID(fid), keysize, recsize);
+}
+
+static void osd_index_restore(const struct lu_env *env, struct osd_device *dev,
+ struct lustre_index_restore_unit *liru, void *buf,
+ int bufsize)
+{
+ struct luz_direntry *zde = &osd_oti_get(env)->oti_zde;
+ struct lu_fid *tgt_fid = &liru->liru_cfid;
+ struct lu_fid bak_fid;
+ int rc;
+ ENTRY;
+
+ lustre_fid2lbx(buf, tgt_fid, bufsize);
+ rc = -zap_lookup(dev->od_os, dev->od_index_backup_id, buf, 8,
+ sizeof(*zde) / 8, (void *)zde);
+ if (rc)
+ GOTO(log, rc);
+
+ rc = osd_get_fid_by_oid(env, dev, zde->lzd_reg.zde_dnode, &bak_fid);
+ if (rc)
+ GOTO(log, rc);
+
+ /* The OI mapping for index may be invalid, since it will be
+ * re-created, not update the OI mapping, just cache it in RAM. */
+ rc = osd_idc_find_and_init_with_oid(env, dev, tgt_fid,
+ liru->liru_clid);
+ if (!rc)
+ rc = lustre_index_restore(env, &dev->od_dt_dev,
+ &liru->liru_pfid, tgt_fid, &bak_fid,
+ liru->liru_name, &dev->od_index_backup_list,
+ &dev->od_lock, buf, bufsize);
+ GOTO(log, rc);
+
+log:
+ CDEBUG(D_WARNING, "%s: restore index '%s' with "DFID": rc = %d\n",
+ osd_name(dev), liru->liru_name, PFID(tgt_fid), rc);
+}
+
/**
* verify FID-in-LMA and OI entry for one object
*
RETURN(0);
}
+ if (lma->lma_compat & LMAC_IDX_BACKUP &&
+ osd_index_need_recreate(env, dev, oid)) {
+ if (parent == dev->od_root) {
+ lu_local_obj_fid(&tfid,
+ OSD_FS_ROOT_OID);
+ } else {
+ rc = osd_get_fid_by_oid(env, dev,
+ parent, &tfid);
+ if (rc) {
+ nvlist_free(nvbuf);
+ RETURN(rc);
+ }
+ }
+
+ rc = lustre_liru_new(
+ &dev->od_index_restore_list,
+ &tfid, &lma->lma_self_fid, oid,
+ name, strlen(name));
+ nvlist_free(nvbuf);
+ RETURN(rc);
+ }
+
tfid = lma->lma_self_fid;
+ if (!(flags & OLF_NOT_BACKUP))
+ osd_ios_index_register(env, dev, &tfid, oid);
}
nvlist_free(nvbuf);
}
OBD_FREE_PTR(item);
}
+ if (!list_empty(&dev->od_index_restore_list)) {
+ char *buf;
+
+ OBD_ALLOC_LARGE(buf, INDEX_BACKUP_BUFSIZE);
+ if (!buf)
+ CERROR("%s: not enough RAM for rebuild index\n",
+ osd_name(dev));
+
+ while (!list_empty(&dev->od_index_restore_list)) {
+ struct lustre_index_restore_unit *liru;
+
+ liru = list_entry(dev->od_index_restore_list.next,
+ struct lustre_index_restore_unit,
+ liru_link);
+ list_del(&liru->liru_link);
+ if (buf)
+ osd_index_restore(env, dev, liru, buf,
+ INDEX_BACKUP_BUFSIZE);
+ OBD_FREE(liru, liru->liru_len);
+ }
+
+ if (buf)
+ OBD_FREE_LARGE(buf, INDEX_BACKUP_BUFSIZE);
+ }
+
EXIT;
}
osd-*.*.full_scrub_threshold_rate=$rate
}
+scrub_enable_index_backup() {
+ do_nodes $(comma_list $(all_server_nodes)) $LCTL set_param -n \
+ osd-*.*.index_backup=1
+}
+
+scrub_disable_index_backup() {
+ do_nodes $(comma_list $(all_server_nodes)) $LCTL set_param -n \
+ osd-*.*.index_backup=0
+}
+
test_0() {
scrub_prep 0
echo "starting MDTs without disabling OI scrub"
}
run_test 15 "Dryrun mode OI scrub"
+test_16() {
+ [ $(facet_fstype $SINGLEMDS) != "zfs" ] &&
+ skip "only support zfs temporarily" && return
+
+ check_mount_and_prep
+ scrub_enable_index_backup
+
+ #define OBD_FAIL_OSD_INDEX_CRASH 0x199
+ do_nodes $(comma_list $(mdts_nodes)) $LCTL set_param fail_loc=0x199
+ scrub_prep 0
+ do_nodes $(comma_list $(mdts_nodes)) $LCTL set_param fail_loc=0
+
+ echo "starting MDTs without disabling OI scrub"
+ scrub_start_mds 1 "$MOUNT_OPTS_SCRUB"
+ mount_client $MOUNT || error "(2) Fail to start client!"
+ scrub_check_data 3
+ scrub_disable_index_backup
+}
+run_test 16 "Initial OI scrub can rebuild crashed index objects"
+
# restore MDS/OST size
MDSSIZE=${SAVED_MDSSIZE}
OSTSIZE=${SAVED_OSTSIZE}