X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fmdd%2Fmdd_orphans.c;h=506916eb91a329d3deed5d4261fdff4dddb91ab2;hb=d85c3cccc66e631da32db4d311b548e2e0ce2acd;hp=2ac658b0580fb353772f577ad688544ef0cf8308;hpb=6c8ed1d8edb0f6fe1d79172f7fa4a0fcf5053dfe;p=fs%2Flustre-release.git diff --git a/lustre/mdd/mdd_orphans.c b/lustre/mdd/mdd_orphans.c index 2ac658b..506916e 100644 --- a/lustre/mdd/mdd_orphans.c +++ b/lustre/mdd/mdd_orphans.c @@ -38,6 +38,7 @@ * Orphan handling code * * Author: Mike Pershin + * Pravin B Shelar */ #ifndef EXPORT_SYMTAB @@ -52,126 +53,368 @@ #include #include "mdd_internal.h" -const char orph_index_name[] = "orphans"; - -static const struct dt_index_features orph_index_features = { - .dif_flags = DT_IND_UPDATE, - .dif_keysize_min = sizeof(struct orph_key), - .dif_keysize_max = sizeof(struct orph_key), - .dif_recsize_min = sizeof(loff_t), - .dif_recsize_max = sizeof(loff_t) -}; +const char orph_index_name[] = "PENDING"; enum { ORPH_OP_UNLINK, ORPH_OP_TRUNCATE }; -static struct orph_key *orph_key_fill(const struct lu_env *env, - const struct lu_fid *lf, __u32 op) +#define ORPHAN_FILE_NAME_FORMAT "%016llx:%08x:%08x:%2x" +#define ORPHAN_FILE_NAME_FORMAT_18 "%llx:%08x" + +static struct dt_key* orph_key_fill(const struct lu_env *env, + const struct lu_fid *lf, __u32 op) { - struct orph_key *key = &mdd_env_info(env)->mti_orph_key; + char *key = mdd_env_info(env)->mti_orph_key; + int rc; + LASSERT(key); - fid_cpu_to_be(&key->ok_fid, lf); - key->ok_op = cpu_to_be32(op); - return key; + rc = snprintf(key, NAME_MAX + 1, ORPHAN_FILE_NAME_FORMAT, fid_seq(lf), + fid_oid(lf), fid_ver(lf), op); + if (rc > 0) + return (struct dt_key*) key; + else + return ERR_PTR(rc); } +static struct dt_key* orph_key_fill_18(const struct lu_env *env, + const struct lu_fid *lf) +{ + char *key = mdd_env_info(env)->mti_orph_key; + int rc; + + LASSERT(key); + rc = snprintf(key, NAME_MAX + 1, ORPHAN_FILE_NAME_FORMAT_18, fid_seq(lf), + fid_oid(lf)); + if (rc > 0) + return (struct dt_key*) key; + else + return ERR_PTR(rc); +} + +static int orphan_key_to_fid(char *key, struct lu_fid *lf) +{ + int rc = 0; + unsigned int op; + + rc = sscanf(key, ORPHAN_FILE_NAME_FORMAT, &lf->f_seq, &lf->f_oid, + &lf->f_ver, &op); + if (rc == 4) + return 0; + + /* build igif */ + rc = sscanf(key, ORPHAN_FILE_NAME_FORMAT_18, + &lf->f_seq, &lf->f_oid); + if (rc == 2) { + lf->f_ver = 0; + return 0; + } + + CERROR("can not parse orphan file name %s\n",key); + return -EINVAL; +} + +static inline void mdd_orphan_write_lock(const struct lu_env *env, + struct mdd_device *mdd) +{ + + struct dt_object *dor = mdd->mdd_orphans; + dor->do_ops->do_write_lock(env, dor, MOR_TGT_ORPHAN); +} + +static inline void mdd_orphan_write_unlock(const struct lu_env *env, + struct mdd_device *mdd) +{ + + struct dt_object *dor = mdd->mdd_orphans; + dor->do_ops->do_write_unlock(env, dor); +} + +static inline int mdd_orphan_insert_obj(const struct lu_env *env, + struct mdd_device *mdd, + struct mdd_object *obj, + __u32 op, + struct thandle *th) +{ + struct dt_object *dor = mdd->mdd_orphans; + const struct lu_fid *lf = mdo2fid(obj); + struct dt_key *key = orph_key_fill(env, lf, op); + ENTRY; + + return dor->do_index_ops->dio_insert(env, dor, + __mdd_fid_rec(env, lf), + key, th, + BYPASS_CAPA, 1); +} + +static inline int mdd_orphan_delete_obj(const struct lu_env *env, + struct mdd_device *mdd , + struct dt_key *key, + struct thandle *th) +{ + struct dt_object *dor = mdd->mdd_orphans; + + return dor->do_index_ops->dio_delete(env, dor, + key, th, + BYPASS_CAPA); +} + +static inline void mdd_orphan_ref_add(const struct lu_env *env, + struct mdd_device *mdd, + struct thandle *th) +{ + struct dt_object *dor = mdd->mdd_orphans; + dor->do_ops->do_ref_add(env, dor, th); +} + +static inline void mdd_orphan_ref_del(const struct lu_env *env, + struct mdd_device *mdd, + struct thandle *th) +{ + struct dt_object *dor = mdd->mdd_orphans; + dor->do_ops->do_ref_del(env, dor, th); +} + + static int orph_index_insert(const struct lu_env *env, - struct mdd_object *obj, __u32 op, - loff_t *offset, struct thandle *th) + struct mdd_object *obj, + __u32 op, + struct thandle *th) { - struct mdd_device *mdd = mdo2mdd(&obj->mod_obj); - struct dt_object *dor = mdd->mdd_orphans; - struct orph_key *key = orph_key_fill(env, mdo2fid(obj), op); + struct mdd_device *mdd = mdo2mdd(&obj->mod_obj); + struct dt_object *dor = mdd->mdd_orphans; + const struct lu_fid *lf_dor = lu_object_fid(&dor->do_lu); + struct dt_object *next = mdd_object_child(obj); + const struct dt_key *dotdot = (const struct dt_key *) ".."; int rc; ENTRY; - rc = dor->do_index_ops->dio_insert(env, dor, (struct dt_rec *)offset, - (struct dt_key *)key, th, - BYPASS_CAPA); + mdd_orphan_write_lock(env, mdd); + + rc = mdd_orphan_insert_obj(env, mdd, obj, op, th); + if (rc) + GOTO(out, rc); + + mdo_ref_add(env, obj, th); + if (!S_ISDIR(mdd_object_type(obj))) + goto out; + + mdo_ref_add(env, obj, th); + mdd_orphan_ref_add(env, mdd, th); + + /* try best to fixup directory, dont return errors + * from here */ + if (!dt_try_as_dir(env, next)) + goto out; + next->do_index_ops->dio_delete(env, next, + dotdot, th, BYPASS_CAPA); + + next->do_index_ops->dio_insert(env, next, + __mdd_fid_rec(env, lf_dor), + dotdot, th, BYPASS_CAPA, 1); + +out: + mdd_orphan_write_unlock(env, mdd); + RETURN(rc); } +/** + * destroy osd object on mdd and associated ost objects. + * + * \param obj orphan object + * \param mdd used for sending llog msg to osts + * + * \retval 0 success + * \retval -ve error + */ +static int orphan_object_kill(const struct lu_env *env, + struct mdd_object *obj, + struct mdd_device *mdd, + struct thandle *th) +{ + struct lu_attr *la = &mdd_env_info(env)->mti_la; + int rc; + + /* No need to lock this object as its recovery phase, and + * no other thread can access it. But we need to lock it + * as its precondition for osd api we using. */ + + mdd_write_lock(env, obj, MOR_TGT_CHILD); + mdo_ref_del(env, obj, th); + if (S_ISDIR(mdd_object_type(obj))) { + mdo_ref_del(env, obj, th); + mdd_orphan_ref_del(env, mdd, th); + mdd_write_unlock(env, obj); + } else { + /* regular file , cleanup linked ost objects */ + rc = mdd_la_get(env, obj, la, BYPASS_CAPA); + mdd_write_unlock(env, obj); + if (rc) + RETURN(rc); + + mdd_lov_destroy(env, mdd, obj, la); + } + return 0; +} + static int orph_index_delete(const struct lu_env *env, - struct mdd_object *obj, __u32 op, + struct mdd_object *obj, + __u32 op, struct thandle *th) { struct mdd_device *mdd = mdo2mdd(&obj->mod_obj); struct dt_object *dor = mdd->mdd_orphans; - struct orph_key *key = orph_key_fill(env, mdo2fid(obj), op); + struct dt_key *key; int rc; + ENTRY; + LASSERT(dor); - rc = dor->do_index_ops->dio_delete(env, dor, - (struct dt_key *)key, th, - BYPASS_CAPA); - RETURN(rc); + key = orph_key_fill(env, mdo2fid(obj), op); + mdd_orphan_write_lock(env, mdd); + + rc = mdd_orphan_delete_obj(env, mdd, key, th); + + if (rc == -ENOENT) { + key = orph_key_fill_18(env, mdo2fid(obj)); + rc = mdd_orphan_delete_obj(env, mdd, key, th); + } + + if (!rc) { + /* lov objects will be destroyed by caller */ + mdo_ref_del(env, obj, th); + if (S_ISDIR(mdd_object_type(obj))) { + mdo_ref_del(env, obj, th); + mdd_orphan_ref_del(env, mdd, th); + } + } else + CERROR("could not delete object: rc = %d\n",rc); + + obj->mod_flags &= ~ORPHAN_OBJ; + mdd_orphan_write_unlock(env, mdd); + RETURN(rc); } -static inline struct orph_key *orph_key_empty(const struct lu_env *env, - __u32 op) + +static int orphan_object_destroy(const struct lu_env *env, + struct mdd_object *obj, + struct dt_key *key) { - struct orph_key *key = &mdd_env_info(env)->mti_orph_key; - LASSERT(key); - fid_zero(&key->ok_fid); - key->ok_op = cpu_to_be32(op); - return key; + struct thandle *th = NULL; + struct mdd_device *mdd = mdo2mdd(&obj->mod_obj); + int rc; + ENTRY; + + mdd_txn_param_build(env, mdd, MDD_TXN_UNLINK_OP); + th = mdd_trans_start(env, mdd); + if (IS_ERR(th)) { + CERROR("Cannot get thandle\n"); + RETURN(-ENOMEM); + } + + mdd_orphan_write_lock(env, mdd); + rc = mdd_orphan_delete_obj(env, mdd, key, th); + if (!rc) + orphan_object_kill(env, obj, mdd, th); + else + CERROR("could not delete object: rc = %d\n",rc); + + mdd_orphan_write_unlock(env, mdd); + mdd_trans_stop(env, mdd, 0, th); + + RETURN(rc); } -static void orph_key_test_and_del(const struct lu_env *env, - struct mdd_device *mdd, - const struct orph_key *key) +static int orph_key_test_and_del(const struct lu_env *env, + struct mdd_device *mdd, + struct lu_fid *lf, + struct dt_key *key) { struct mdd_object *mdo; + int rc; + + mdo = mdd_object_find(env, mdd, lf); - mdo = mdd_object_find(env, mdd, &key->ok_fid); if (IS_ERR(mdo)) - CERROR("Invalid orphan!\n"); - else { - mdd_write_lock(env, mdo, MOR_TGT_CHILD); - if (mdo->mod_count == 0) { - /* non-opened orphan, let's delete it */ - struct md_attr *ma = &mdd_env_info(env)->mti_ma; - CWARN("Found orphan!\n"); - mdd_object_kill(env, mdo, ma); - /* TODO: now handle OST objects */ - //mdd_ost_objects_destroy(env, ma); - /* TODO: destroy index entry */ - } - mdd_write_unlock(env, mdo); - mdd_object_put(env, mdo); + return PTR_ERR(mdo); + + rc = -EBUSY; + if (mdo->mod_count == 0) { + CWARN("Found orphan!\n"); + rc = orphan_object_destroy(env, mdo, key); + } else { + mdo->mod_flags |= ORPHAN_OBJ; } + + mdd_object_put(env, mdo); + return rc; } static int orph_index_iterate(const struct lu_env *env, struct mdd_device *mdd) { - struct dt_object *dt_obj = mdd->mdd_orphans; + struct dt_object *dor = mdd->mdd_orphans; + char *mti_key = mdd_env_info(env)->mti_orph_key; + const struct dt_it_ops *iops; struct dt_it *it; - struct dt_it_ops *iops; - struct orph_key *key = orph_key_empty(env, 0); - int result; + char *key; + struct lu_fid fid; + int result = 0; + int key_sz = 0; + int rc; + __u64 cookie; ENTRY; - iops = &dt_obj->do_index_ops->dio_it; - it = iops->init(env, dt_obj, 1, BYPASS_CAPA); + /* In recovery phase, do not need for any lock here */ + + iops = &dor->do_index_ops->dio_it; + it = iops->init(env, dor, BYPASS_CAPA); if (it != NULL) { - result = iops->get(env, it, (const void *)key); + result = iops->get(env, it, (const void *)""); if (result > 0) { - int i; /* main cycle */ - for (result = 0, i = 0; result == +1; ++i) { + do { + key = (void *)iops->key(env, it); - fid_be_to_cpu(&key->ok_fid, &key->ok_fid); - orph_key_test_and_del(env, mdd, key); + if (IS_ERR(key)) + goto next; + key_sz = iops->key_size(env, it); + + /* filter out "." and ".." entries from + * PENDING dir. */ + if (key_sz < 8) + goto next; + + memcpy(mti_key, key, key_sz); + mti_key[key_sz] = 0; + + if (orphan_key_to_fid(mti_key, &fid)) + goto next; + if (!fid_is_sane(&fid)) + goto next; + + /* kill orphan object */ + cookie = iops->store(env, it); + iops->put(env, it); + rc = orph_key_test_and_del(env, mdd, &fid, + (struct dt_key *)mti_key); + + /* after index delete reset iterator */ + if (!rc) + result = iops->get(env, it, + (const void *)""); + else + result = iops->load(env, it, cookie); +next: result = iops->next(env, it); - } + } while (result == 0); + result = 0; } else if (result == 0) /* Index contains no zero key? */ result = -EIO; - iops->put(env, it); iops->fini(env, it); } else @@ -184,17 +427,17 @@ int orph_index_init(const struct lu_env *env, struct mdd_device *mdd) { struct lu_fid fid; struct dt_object *d; - int rc; + int rc = 0; ENTRY; - d = dt_store_open(env, mdd->mdd_child, orph_index_name, &fid); + d = dt_store_open(env, mdd->mdd_child, "", orph_index_name, &fid); if (!IS_ERR(d)) { mdd->mdd_orphans = d; - rc = d->do_ops->do_index_try(env, d, &orph_index_features); - if (rc == 0) - LASSERT(d->do_index_ops != NULL); - else - CERROR("\"%s\" is not an index!\n", orph_index_name); + if (!dt_try_as_dir(env, d)) { + rc = -ENOTDIR; + CERROR("\"%s\" is not an index! : rc = %d\n", + orph_index_name, rc); + } } else { CERROR("cannot find \"%s\" obj %d\n", orph_index_name, (int)PTR_ERR(d)); @@ -214,18 +457,45 @@ void orph_index_fini(const struct lu_env *env, struct mdd_device *mdd) EXIT; } +/** + * Iterate orphan index to cleanup orphan objects in case of recovery. + * \param d mdd device in recovery. + * + */ + int __mdd_orphan_cleanup(const struct lu_env *env, struct mdd_device *d) { return orph_index_iterate(env, d); } +/** + * delete an orphan \a obj from orphan index. + * \param obj file or directory. + * \param th transaction for index insert. + * + * \pre obj nlink == 0 && obj->mod_count != 0 + * + * \retval 0 success + * \retva -ve index operation error. + */ + int __mdd_orphan_add(const struct lu_env *env, struct mdd_object *obj, struct thandle *th) { - loff_t offset = 0; - return orph_index_insert(env, obj, ORPH_OP_UNLINK, &offset, th); + return orph_index_insert(env, obj, ORPH_OP_UNLINK, th); } +/** + * delete an orphan \a obj from orphan index. + * \param obj file or directory. + * \param th transaction for index deletion and object destruction. + * + * \pre obj->mod_count == 0 && ORPHAN_OBJ is set for obj. + * + * \retval 0 success + * \retva -ve index operation error. + */ + int __mdd_orphan_del(const struct lu_env *env, struct mdd_object *obj, struct thandle *th) {