From 6c25b2acc5f5a86229c758ffa8d2a313672dabe3 Mon Sep 17 00:00:00 2001 From: huanghua Date: Thu, 31 Aug 2006 14:21:10 +0000 Subject: [PATCH] added some open recovery prototype code. --- lustre/mdt/mdt_handler.c | 2 - lustre/mdt/mdt_internal.h | 1 + lustre/mdt/mdt_open.c | 213 +++++++++++++++++++++++++++++++++++++++++----- lustre/mdt/mdt_recovery.c | 14 +-- 4 files changed, 195 insertions(+), 35 deletions(-) diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 137e594..ce51010 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -1546,9 +1546,7 @@ static int mdt_intent_getattr(enum mdt_it_code opcode, struct ldlm_reply *ldlm_rep; struct mdt_lock_handle tmp_lock; struct mdt_lock_handle *lhc = &tmp_lock; - struct mdt_device *mdt = info->mti_mdt; __u64 child_bits; - int rc; ENTRY; diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index 8d4929a..e34ed29 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -402,6 +402,7 @@ int mdt_done_writing(struct mdt_thread_info *info); void mdt_shrink_reply(struct mdt_thread_info *info, int offset); int mdt_handle_last_unlink(struct mdt_thread_info *, struct mdt_object *, const struct md_attr *); +void mdt_reconstruct_open(struct mdt_thread_info *); void mdt_dump_lmm(int level, struct lov_mds_md *lmm); diff --git a/lustre/mdt/mdt_open.c b/lustre/mdt/mdt_open.c index b62c0a1..6fc5e66 100644 --- a/lustre/mdt/mdt_open.c +++ b/lustre/mdt/mdt_open.c @@ -300,6 +300,186 @@ static int mdt_mfd_open(struct mdt_thread_info *info, RETURN(rc); } +extern void mdt_req_from_mcd(struct ptlrpc_request *req, + struct mdt_client_data *mcd); + + +/* TODO: much of the code here can be shared with mdt_open() + * and mdt_mfd_open(). This will be done later. :-) + */ +void mdt_reconstruct_open(struct mdt_thread_info *info) +{ + struct lu_fid *child_fid = &info->mti_tmp_fid1; + __u32 flags = info->mti_spec.sp_cr_flags; + struct req_capsule *pill = &info->mti_pill; + struct ptlrpc_request *req = mdt_info_req(info); + struct mdt_export_data *med = &req->rq_export->exp_mdt_data; + struct mdt_client_data *mcd = med->med_mcd; + struct md_attr *ma = &info->mti_attr; + struct lu_attr *la = &ma->ma_attr; + struct mdt_device *mdt = info->mti_mdt; + struct mdt_reint_record *rr = &info->mti_rr; + const struct lu_context *ctxt = info->mti_ctxt; + struct mdt_object *parent; + struct mdt_object *child; + struct ldlm_reply *ldlm_rep; + struct mdt_body *repbody; + struct mdt_file_data *mfd; + struct list_head *h; + int result; + int isreg, isdir, islnk; + ENTRY; + + LASSERT(pill->rc_fmt == &RQF_LDLM_INTENT_OPEN); + + mdt_req_from_mcd(req, med->med_mcd); + mdt_set_disposition(info, ldlm_rep, le32_to_cpu(mcd->mcd_last_data)); + + if (mdt_get_disposition(ldlm_rep, DISP_OPEN_CREATE) && req->rq_status) { + /* we did not create successfully, return error to client. */ + EXIT; + return; + } + + mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD); + mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD); + parent = mdt_object_find(ctxt, mdt, rr->rr_fid1); + if (IS_ERR(parent)) { + /* FIXME: should this be assert? */ + CERROR("Cannot find parent "DFID" while reconstruct open\n", + PFID(rr->rr_fid1)); + GOTO(out, result = PTR_ERR(parent)); + } + + result = mdo_lookup(ctxt, mdt_object_child(parent), + rr->rr_name, child_fid); + if (result == 0) + mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS); + else { + if (result == -ENOENT || result == -ESTALE) { + mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG); + result = -ENOENT; + } + GOTO(out_parent, result); + } + + LASSERT(lu_fid_eq(rr->rr_fid2, child_fid)); + child = mdt_object_find(ctxt, mdt, child_fid); + if (IS_ERR(child)) + GOTO(out_parent, result = PTR_ERR(child)); + + ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD); + ma->ma_lmm_size = mdt->mdt_max_mdsize; + ma->ma_need = MA_INODE | MA_LOV; + repbody = req_capsule_server_get(pill, &RMF_MDT_BODY); + repbody->eadatasize = 0; + repbody->aclsize = 0; + + result = mo_attr_get(ctxt, mdt_object_child(child), ma); + if (result == -EREMOTE) { + /* the object is on remote node + * return its FID for remote open */ + repbody->fid1 = *mdt_object_fid(child); + repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS); + GOTO(out_child, result = 0); + } else if(result != 0) + GOTO(out_child, result); + + LASSERT(ma->ma_valid & MA_INODE); + isreg = S_ISREG(la->la_mode); + isdir = S_ISDIR(la->la_mode); + islnk = S_ISLNK(la->la_mode); + mdt_pack_attr2body(repbody, la, mdt_object_fid(child)); + + /* if we are following a symlink, don't open; and + * do not return open handle for special nodes as client required + */ + if (islnk || (!isreg && !isdir && + (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH))) { + info->mti_trans_flags |= MDT_NONEED_TRANSNO; + GOTO(out_child, 0); + } + + mdt_set_disposition(info, ldlm_rep, DISP_OPEN_OPEN); + if (!mdt_get_disposition(ldlm_rep, DISP_OPEN_CREATE) && + (flags & MDS_OPEN_EXCL) && (flags & MDS_OPEN_CREAT)) + GOTO(out_child, result = -EEXIST); + + if (isdir) { + if (flags & (MDS_OPEN_CREAT | FMODE_WRITE)) + /* we are trying to create or + * write an existing dir. */ + GOTO(out_child, result = -EISDIR); + } else if (flags & MDS_OPEN_DIRECTORY) + GOTO(out_child, result = -ENOTDIR); + + /* at this point, regular file should have lov */ + LASSERT(ergo(isreg, ma->ma_valid & MA_LOV)); + + if (ma->ma_valid & MA_LOV) { + LASSERT(ma->ma_lmm_size); + repbody->eadatasize = ma->ma_lmm_size; + if (isdir) + repbody->valid |= OBD_MD_FLDIREA; + else + repbody->valid |= OBD_MD_FLEASIZE; + } + mdt_shrink_reply(info, DLM_REPLY_REC_OFF + 1); + + if (flags & FMODE_WRITE) { + /* FIXME: in recovery, need to pass old epoch here */ + result = mdt_get_write_access(mdt, child, 0); + if (result == 0) + repbody->io_epoch = child->mot_io_epoch; + } else if (flags & MDS_FMODE_EXEC) + result = mdt_deny_write_access(mdt, child); + if (result) + GOTO(out_child, result); + + result = mo_open(ctxt, mdt_object_child(child), flags); + if (result) + GOTO(out_child, result); + + mfd = NULL; + spin_lock(&med->med_open_lock); + list_for_each(h, &med->med_open_head) { + mfd = list_entry(h, struct mdt_file_data, mfd_list); + if (mfd->mfd_xid == req->rq_xid) { + break; + } + mfd = NULL; + } + spin_unlock(&med->med_open_lock); + + if (mfd != NULL) { + repbody->handle.cookie = mfd->mfd_handle.h_cookie; + GOTO(out_child, 0); + } + + mfd = mdt_mfd_new(); + if (mfd != NULL) { + mdt_object_get(ctxt, child); + + mfd->mfd_mode = flags; + mfd->mfd_object = child; + mfd->mfd_xid = req->rq_xid; + + med = &req->rq_export->exp_mdt_data; + spin_lock(&med->med_open_lock); + list_add(&mfd->mfd_list, &med->med_open_head); + spin_unlock(&med->med_open_lock); + repbody->handle.cookie = mfd->mfd_handle.h_cookie; + } else + result = -ENOMEM; + EXIT; +out_child: + mdt_object_put(ctxt, child); +out_parent: + mdt_object_put(ctxt, parent); +out: + req->rq_status = result; +} + static int mdt_open_by_fid(struct mdt_thread_info* info, const struct lu_fid *fid, struct ldlm_reply *rep, @@ -319,29 +499,20 @@ static int mdt_open_by_fid(struct mdt_thread_info* info, if (rc > 0) { /* successfully found the child object */ if (flags & MDS_OPEN_EXCL && flags & MDS_OPEN_CREAT) - rc = -EEXIST; + rc = -EEXIST; else { - struct txn_param txn; - struct thandle *th; - struct dt_device *dt = info->mti_mdt->mdt_bottom; - txn.tp_credits = 1; - - LASSERT(dt); - th = dt->dd_ops->dt_trans_start(info->mti_ctxt, - dt, &txn); - if (!IS_ERR(th)) { - dt->dd_ops->dt_trans_stop(info->mti_ctxt, th); - rc = mo_attr_get(info->mti_ctxt, - mdt_object_child(o), ma); - } - else - rc = PTR_ERR(th); - if (rc == 0) { - mdt_set_disposition(info, rep, DISP_LOOKUP_EXECD); - mdt_set_disposition(info, rep, DISP_LOOKUP_POS); - mdt_set_disposition(info, rep, DISP_OPEN_OPEN); + const struct lu_context *ctxt = info->mti_ctxt; + struct mdt_device *mdt = info->mti_mdt; + + spin_lock(&mdt->mdt_transno_lock); + info->mti_transno = ++ mdt->mdt_last_transno; + spin_unlock(&mdt->mdt_transno_lock); + + mdt_set_disposition(info, rep, DISP_LOOKUP_EXECD); + mdt_set_disposition(info, rep, DISP_LOOKUP_POS); + rc = mo_attr_get(ctxt, mdt_object_child(o), ma); + if (rc == 0) rc = mdt_mfd_open(info, NULL, o, flags, 0, rep); - } } } else if (rc == 0) { rc = -ENOENT; diff --git a/lustre/mdt/mdt_recovery.c b/lustre/mdt/mdt_recovery.c index 8557633..0b4928f 100644 --- a/lustre/mdt/mdt_recovery.c +++ b/lustre/mdt/mdt_recovery.c @@ -663,8 +663,8 @@ void mdt_fs_cleanup(const struct lu_context *ctx, struct mdt_device *mdt) } /* reconstruction code */ -static inline void mdt_req_from_mcd(struct ptlrpc_request *req, - struct mdt_client_data *mcd) +void mdt_req_from_mcd(struct ptlrpc_request *req, + struct mdt_client_data *mcd) { DEBUG_REQ(D_HA, req, "restoring transno "LPD64"/status %d", mcd->mcd_last_transno, mcd->mcd_last_result); @@ -744,16 +744,6 @@ static void mdt_reconstruct_setattr(struct mdt_thread_info *mti) mdt_object_put(mti->mti_ctxt, obj); } -static void mdt_reconstruct_open(struct mdt_thread_info *mti) -{ - struct ptlrpc_request *req = mdt_info_req(mti); - struct mdt_export_data *med = &req->rq_export->exp_mdt_data; - - /*TODO: after open recovery task will be finished */ - - mdt_req_from_mcd(req, med->med_mcd); -} - typedef void (*mdt_reconstructor)(struct mdt_thread_info *mti); static mdt_reconstructor reconstructors[REINT_MAX] = { -- 1.8.3.1