From 1fbeb41aa6c85b0d37d257d3e391652d61438f3d Mon Sep 17 00:00:00 2001 From: tappro Date: Sat, 12 Aug 2006 11:21:29 +0000 Subject: [PATCH] - introduce open counter in mdd - update mo_close() method with adding the md_attr - split mdd_attr_get() into two sub-function __mdd_atte_get, __mdd_lov_get because they are needed for internal use in mdd - return lov ea and llog cookie in mdt_close() for unlinked file - mdt_handle_last_unlink() checks the ma_valid fields to decide what to return to the client --- lustre/cmm/cmm_object.c | 8 ++- lustre/include/md_object.h | 8 ++- lustre/mdd/mdd_handler.c | 138 ++++++++++++++++++++++++++++++++++----------- lustre/mdd/mdd_internal.h | 2 + lustre/mdt/mdt_handler.c | 9 ++- lustre/mdt/mdt_internal.h | 7 +-- lustre/mdt/mdt_lib.c | 41 ++++++-------- lustre/mdt/mdt_open.c | 39 +++++++------ 8 files changed, 161 insertions(+), 91 deletions(-) diff --git a/lustre/cmm/cmm_object.c b/lustre/cmm/cmm_object.c index 9a68ca1..a275d6f 100644 --- a/lustre/cmm/cmm_object.c +++ b/lustre/cmm/cmm_object.c @@ -309,11 +309,12 @@ static int cml_open(const struct lu_context *ctx, struct md_object *mo) RETURN(rc); } -static int cml_close(const struct lu_context *ctx, struct md_object *mo) +static int cml_close(const struct lu_context *ctx, struct md_object *mo, + struct md_attr *ma) { int rc; ENTRY; - rc = mo_close(ctx, md_object_next(mo)); + rc = mo_close(ctx, md_object_next(mo), ma); RETURN(rc); } @@ -612,7 +613,8 @@ static int cmr_open(const struct lu_context *ctx, struct md_object *mo) RETURN(-EREMOTE); } -static int cmr_close(const struct lu_context *ctx, struct md_object *mo) +static int cmr_close(const struct lu_context *ctx, struct md_object *mo, + struct md_attr *ma) { RETURN(-EFAULT); } diff --git a/lustre/include/md_object.h b/lustre/include/md_object.h index 56e0c1f..18a2053 100644 --- a/lustre/include/md_object.h +++ b/lustre/include/md_object.h @@ -116,7 +116,8 @@ struct md_object_operations { int (*moo_ref_del)(const struct lu_context *, struct md_object *, struct md_attr *); int (*moo_open)(const struct lu_context *, struct md_object *); - int (*moo_close)(const struct lu_context *, struct md_object *); + int (*moo_close)(const struct lu_context *, struct md_object *, + struct md_attr *); }; /* @@ -295,10 +296,11 @@ static inline int mo_open(const struct lu_context *cx, struct md_object *m) return m->mo_ops->moo_open(cx, m); } -static inline int mo_close(const struct lu_context *cx, struct md_object *m) +static inline int mo_close(const struct lu_context *cx, struct md_object *m, + struct md_attr *ma) { LASSERT(m->mo_ops->moo_close); - return m->mo_ops->moo_close(cx, m); + return m->mo_ops->moo_close(cx, m, ma); } static inline int mo_readpage(const struct lu_context *cx, struct md_object *m, diff --git a/lustre/mdd/mdd_handler.c b/lustre/mdd/mdd_handler.c index 828352b..659754c 100644 --- a/lustre/mdd/mdd_handler.c +++ b/lustre/mdd/mdd_handler.c @@ -183,6 +183,7 @@ static int mdd_may_create(const struct lu_context *ctxt, /*check pobj may create or not*/ RETURN(0); } + /*Check whether it may delete the cobj under the pobj*/ static int mdd_may_delete(const struct lu_context *ctxt, struct mdd_object *pobj, struct mdd_object *cobj, @@ -214,34 +215,64 @@ static int mdd_may_delete(const struct lu_context *ctxt, RETURN(rc); } -static int mdd_attr_get(const struct lu_context *ctxt, - struct md_object *obj, struct md_attr *ma) +static int __mdd_attr_get(const struct lu_context *ctxt, + struct mdd_object *obj, struct md_attr *ma) { - struct mdd_object *mdd_obj = md2mdd_obj(obj); struct dt_object *next; int rc; ENTRY; - LASSERT(lu_object_exists(ctxt, &obj->mo_lu)); + LASSERT(lu_object_exists(ctxt, mdd2lu_obj(obj))); - next = mdd_object_child(mdd_obj); + next = mdd_object_child(obj); rc = next->do_ops->do_attr_get(ctxt, next, &ma->ma_attr); if (rc == 0) { LASSERT((ma->ma_attr.la_mode & S_IFMT) == - (obj->mo_lu.lo_header->loh_attr & S_IFMT)); + (lu_object_attr(mdd2lu_obj(obj)) & S_IFMT)); ma->ma_valid = MA_INODE; - /* get LOV EA also */ - if ((S_ISREG(ma->ma_attr.la_mode) - || S_ISDIR(ma->ma_attr.la_mode)) - && ma->ma_lmm != 0 && ma->ma_lmm_size > 0) { - rc = mdd_get_md(ctxt, obj, ma->ma_lmm, &ma->ma_lmm_size); - if (rc > 0) { - ma->ma_valid |= MA_LOV; - rc = 0; - } + } + + RETURN(rc); +} + +static int __mdd_lov_get(const struct lu_context *ctxt, + struct mdd_object *obj, struct md_attr *ma) +{ + struct dt_object *next; + int rc = 0; + + ENTRY; + + next = mdd_object_child(obj); + if ((S_ISREG(ma->ma_attr.la_mode) || S_ISDIR(ma->ma_attr.la_mode)) + && ma->ma_lmm != 0 && ma->ma_lmm_size > 0) { + rc = mdd_get_md(ctxt, &obj->mod_obj, + ma->ma_lmm, &ma->ma_lmm_size); + if (rc > 0) { + ma->ma_valid |= MA_LOV; + rc = 0; } } + + RETURN(rc); +} + +static int mdd_attr_get(const struct lu_context *ctxt, + struct md_object *obj, struct md_attr *ma) +{ + struct mdd_object *mdd_obj = md2mdd_obj(obj); + struct dt_object *next; + int rc; + + ENTRY; + + next = mdd_object_child(mdd_obj); + rc = __mdd_attr_get(ctxt, mdd_obj, ma); + if (rc == 0) { + /* get LOV EA also */ + rc = __mdd_lov_get(ctxt, mdd_obj, ma); + } CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n", rc, ma->ma_valid); RETURN(rc); @@ -781,6 +812,29 @@ static int mdd_dir_is_empty(const struct lu_context *ctx, return result; } +static int __mdd_finish_unlink(const struct lu_context *ctxt, + struct mdd_object *obj, struct md_attr *ma) +{ + int rc; + ENTRY; + rc = __mdd_attr_get(ctxt, obj, ma); + if (rc) + RETURN(rc); + + if (atomic_read(&obj->mod_count) == 0 && + ma->ma_attr.la_nlink == 0) { + rc = __mdd_lov_get(ctxt, obj, ma); + if (rc) { + CERROR("Can't get LOV attr during unlink()\n"); + } + if(S_ISREG(ma->ma_attr.la_mode) && + ma->ma_valid & MA_LOV) + rc = mdd_unlink_log(ctxt, mdo2mdd(&obj->mod_obj), + obj, ma); + } + RETURN(rc); +} + static int mdd_unlink_sanity_check(const struct lu_context *ctxt, struct mdd_object *pobj, struct mdd_object *cobj, @@ -795,7 +849,7 @@ static int mdd_unlink_sanity_check(const struct lu_context *ctxt, RETURN(rc); if (S_ISDIR(mdd_object_type(cobj)) && - dt_try_as_dir(ctxt, dt_cobj)) { + dt_try_as_dir(ctxt, dt_cobj)) { rc = mdd_dir_is_empty(ctxt, cobj); if (rc != 0) RETURN(rc); @@ -832,23 +886,15 @@ static int mdd_unlink(const struct lu_context *ctxt, struct md_object *pobj, GOTO(cleanup, rc); __mdd_ref_del(ctxt, mdd_cobj, handle); - if (S_ISDIR(ma->ma_attr.la_mode)) { + if (S_ISDIR(lu_object_attr(&cobj->mo_lu))) { /* unlink dot */ __mdd_ref_del(ctxt, mdd_cobj, handle); /* unlink dotdot */ __mdd_ref_del(ctxt, mdd_pobj, handle); } - mdd_attr_get(ctxt, cobj, ma); - - mdd_set_dead_obj(mdd_cobj); -#if 0 - /*This should be moved to handle last unlink. wait open - * orphan prototype finished*/ - if (S_ISREG(ma->ma_attr.la_mode) && (ma->ma_valid & MA_LOV) && - ma->ma_attr.la_nlink == 0 && cobj->mo_lu.lo_header->loh_ref == 1) { - rc = mdd_unlink_log(ctxt, mdd, mdd_cobj, ma); - } -#endif + + rc = __mdd_finish_unlink(ctxt, mdd_cobj, ma); + cleanup: mdd_unlock2(ctxt, mdd_pobj, mdd_cobj); mdd_trans_stop(ctxt, mdd, handle); @@ -996,10 +1042,11 @@ static int mdd_rename(const struct lu_context *ctxt, struct md_object *src_pobj, if (tobj) mdd_tobj = md2mdd_obj(tobj); + /*XXX: shouldn't this check be done under lock below? */ rc = mdd_rename_sanity_check(ctxt, mdd_spobj, mdd_tpobj, mdd_sobj, mdd_tobj); if (rc) - GOTO(cleanup, rc); + RETURN(rc); mdd_txn_param_build(ctxt, &MDD_TXN_RENAME); handle = mdd_trans_start(ctxt, mdd); @@ -1546,6 +1593,7 @@ static int mdd_ref_del(const struct lu_context *ctxt, struct md_object *obj, struct mdd_object *mdd_obj = md2mdd_obj(obj); struct mdd_device *mdd = mdo2mdd(obj); struct thandle *handle; + int rc; ENTRY; mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET); @@ -1554,22 +1602,44 @@ static int mdd_ref_del(const struct lu_context *ctxt, struct md_object *obj, RETURN(-ENOMEM); mdd_lock(ctxt, mdd_obj, DT_WRITE_LOCK); + + /* rmdir checks */ + if (S_ISDIR(lu_object_attr(&obj->mo_lu)) && + dt_try_as_dir(ctxt, mdd_object_child(mdd_obj))) { + rc = mdd_dir_is_empty(ctxt, mdd_obj); + if (rc != 0) + GOTO(cleanup, rc); + } + __mdd_ref_del(ctxt, mdd_obj, handle); - mdd_attr_get(ctxt, obj, ma); - mdd_unlock(ctxt, mdd_obj, DT_WRITE_LOCK); + if (S_ISDIR(lu_object_attr(&obj->mo_lu))) { + /* unlink dot */ + __mdd_ref_del(ctxt, mdd_obj, handle); + } + + rc = __mdd_finish_unlink(ctxt, mdd_obj, ma); +cleanup: + mdd_unlock(ctxt, mdd_obj, DT_WRITE_LOCK); mdd_trans_stop(ctxt, mdd, handle); - - RETURN(0); + RETURN(rc); } static int mdd_open(const struct lu_context *ctxt, struct md_object *obj) { + atomic_inc(&md2mdd_obj(obj)->mod_count); return 0; } -static int mdd_close(const struct lu_context *ctxt, struct md_object *obj) +static int mdd_close(const struct lu_context *ctxt, struct md_object *obj, + struct md_attr *ma) { + __mdd_attr_get(ctxt, md2mdd_obj(obj), ma); + + if (atomic_dec_and_test(&md2mdd_obj(obj)->mod_count)) { + CWARN("File closed\n"); + } + return 0; } diff --git a/lustre/mdd/mdd_internal.h b/lustre/mdd/mdd_internal.h index c355615..1969a1f 100644 --- a/lustre/mdd/mdd_internal.h +++ b/lustre/mdd/mdd_internal.h @@ -48,6 +48,8 @@ enum mod_flags { struct mdd_object { struct md_object mod_obj; + /* open count */ + atomic_t mod_count; unsigned long mod_flags; }; diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 65447f5..6f9ea7a 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -1307,7 +1307,7 @@ static int mdt_recovery(struct ptlrpc_request *req) rc = mds_filter_recovery_request(req, obd, &should_process); if (rc != 0 || !should_process) { - LASSERT(rc < 0); + //LASSERT(rc < 0); RETURN(rc); } } @@ -2608,6 +2608,7 @@ static int mdt_destroy_export(struct obd_export *export) struct obd_device *obd = export->exp_obd; struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev); struct lu_context ctxt; + struct md_attr ma; int rc = 0; ENTRY; @@ -2628,13 +2629,17 @@ static int mdt_destroy_export(struct obd_export *export) struct list_head *tmp = med->med_open_head.next; struct mdt_file_data *mfd = list_entry(tmp, struct mdt_file_data, mfd_list); + struct mdt_object *o = mfd->mfd_object; /* Remove mfd handle so it can't be found again. * We are consuming the mfd_list reference here. */ class_handle_unhash(&mfd->mfd_handle); list_del_init(&mfd->mfd_list); spin_unlock(&med->med_open_lock); - mdt_mfd_close(&ctxt, mdt, mfd); + mdt_mfd_close(&ctxt, mdt, mfd, &ma); + /* TODO: if we close the unlinked file, + * we need to remove it's objects from OST */ + mdt_object_put(&ctxt, o); spin_lock(&med->med_open_lock); } spin_unlock(&med->med_open_lock); diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index 1717ab6..4590d12 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -357,11 +357,6 @@ void mdt_fs_cleanup(const struct lu_context *ctxt, int mdt_client_free(const struct lu_context *ctxt, struct mdt_device *mdt, struct mdt_export_data *med); - -int mdt_update_server_data(const struct lu_context *ctxt, - struct mdt_device *mdt, - int sync); - int mdt_client_add(const struct lu_context *ctxt, struct mdt_device *mdt, struct mdt_export_data *med, @@ -376,7 +371,7 @@ int mdt_lock_new_child(struct mdt_thread_info *info, int mdt_reint_open(struct mdt_thread_info *info); void mdt_mfd_close(const struct lu_context *ctxt, struct mdt_device *mdt, - struct mdt_file_data *mfd); + struct mdt_file_data *mfd, struct md_attr *ma); int mdt_close(struct mdt_thread_info *info); diff --git a/lustre/mdt/mdt_lib.c b/lustre/mdt/mdt_lib.c index d0797b0..a618fe7 100644 --- a/lustre/mdt/mdt_lib.c +++ b/lustre/mdt/mdt_lib.c @@ -105,31 +105,26 @@ int mdt_handle_last_unlink(struct mdt_thread_info *info, struct mdt_object *mo, const struct lu_attr *la = &ma->ma_attr; ENTRY; - - /* if this is the last unlinked object reference, - * so client should destroy ost objects*/ - if (S_ISREG(la->la_mode) && - la->la_nlink == 0 && mo->mot_header.loh_ref == 1) { - - CDEBUG(D_INODE, "Last reference is released on "DFID"\n", - PFID(mdt_object_fid(mo))); - - CDEBUG(D_INODE, "ma_valid = "LPX64"\n", ma->ma_valid); - repbody = req_capsule_server_get(&info->mti_pill, - &RMF_MDT_BODY); - if (ma->ma_valid & MA_INODE) - mdt_pack_attr2body(repbody, la, mdt_object_fid(mo)); - if (/*ma->ma_lmm_size && */(ma->ma_valid & MA_LOV)) { - LASSERT(ma->ma_lmm_size); - mdt_dump_lmm(D_INFO, ma->ma_lmm); - repbody->eadatasize = ma->ma_lmm_size; + repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY); + + if (ma->ma_valid & MA_INODE) + mdt_pack_attr2body(repbody, la, mdt_object_fid(mo)); + + if (ma->ma_valid & MA_LOV) { + LASSERT(ma->ma_lmm_size); + mdt_dump_lmm(D_INFO, ma->ma_lmm); + repbody->eadatasize = ma->ma_lmm_size; + if (S_ISREG(lu_object_attr(&mo->mot_obj.mo_lu))) repbody->valid |= OBD_MD_FLEASIZE; - } - - if (ma->ma_cookie_size && (ma->ma_valid & MA_COOKIE)) - repbody->valid |= OBD_MD_FLCOOKIE; + else if (S_ISDIR(lu_object_attr(&mo->mot_obj.mo_lu))) + repbody->valid |= OBD_MD_FLDIREA; + else + LBUG(); } - + + if (ma->ma_cookie_size && (ma->ma_valid & MA_COOKIE)) + repbody->valid |= OBD_MD_FLCOOKIE; + RETURN(0); } diff --git a/lustre/mdt/mdt_open.c b/lustre/mdt/mdt_open.c index 74af20a..0fe38f1 100644 --- a/lustre/mdt/mdt_open.c +++ b/lustre/mdt/mdt_open.c @@ -290,7 +290,9 @@ static int mdt_mfd_open(struct mdt_thread_info *info, /* keep a reference on this object for this open, * and is released by mdt_mfd_close() */ mdt_object_get(info->mti_ctxt, o); - + /* open hanling */ + mo_open(info->mti_ctxt, mdt_object_child(o)); + mfd->mfd_mode = flags; mfd->mfd_object = o; mfd->mfd_xid = mdt_info_req(info)->rq_xid; @@ -470,8 +472,9 @@ out: return result; } -void mdt_mfd_close(const struct lu_context *ctxt, struct mdt_device *mdt, - struct mdt_file_data *mfd) +void mdt_mfd_close(const struct lu_context *ctxt, + struct mdt_device *mdt, struct mdt_file_data *mfd, + struct md_attr *ma) { struct mdt_object *o = mfd->mfd_object; ENTRY; @@ -481,22 +484,18 @@ void mdt_mfd_close(const struct lu_context *ctxt, struct mdt_device *mdt, } else if (mfd->mfd_mode & MDS_FMODE_EXEC) { mdt_allow_write_access(o); } - - /* release reference on this object. - * it will be destroyed by lower layer if necessary. - */ - mdt_object_put(ctxt, mfd->mfd_object); - + mdt_mfd_free(mfd); - EXIT; + + mo_close(ctxt, mdt_object_child(o), ma); } int mdt_close(struct mdt_thread_info *info) { - struct md_attr *ma = &info->mti_attr; struct mdt_export_data *med; struct mdt_file_data *mfd; struct mdt_object *o; + struct md_attr *ma = &info->mti_attr; int rc; ENTRY; @@ -514,17 +513,17 @@ int mdt_close(struct mdt_thread_info *info) class_handle_unhash(&mfd->mfd_handle); list_del_init(&mfd->mfd_list); spin_unlock(&med->med_open_lock); - - o = mfd->mfd_object; + ma->ma_lmm = req_capsule_server_get(&info->mti_pill, - &RMF_MDT_MD); + &RMF_MDT_MD); ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill, - &RMF_MDT_MD, RCL_SERVER); - rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o), ma); - if (rc == 0) - rc = mdt_handle_last_unlink(info, o, ma); - - mdt_mfd_close(info->mti_ctxt, info->mti_mdt, mfd); + &RMF_MDT_MD, + RCL_SERVER); + o = mfd->mfd_object; + mdt_mfd_close(info->mti_ctxt, info->mti_mdt, mfd, ma); + rc = mdt_handle_last_unlink(info, o, ma); + /* release reference on this object. */ + mdt_object_put(info->mti_ctxt, o); } mdt_shrink_reply(info); RETURN(rc); -- 1.8.3.1