From d69840b9dd14776171b59ccb12c4b65765573b5d Mon Sep 17 00:00:00 2001 From: huanghua Date: Thu, 27 Apr 2006 06:58:21 +0000 Subject: [PATCH] add intent support intially, and much work are still needed. --- lustre/mdt/mdt_handler.c | 282 ++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 266 insertions(+), 16 deletions(-) diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 07c94ad..0ce79af 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -53,6 +53,9 @@ /* lu2dt_dev() */ #include +/*LUSTRE_POSIX_ACL_MAX_SIZE*/ +#include + /* struct mds_client_data */ #include "../mds/mds_internal.h" @@ -246,6 +249,30 @@ static int mdt_readpage(struct mdt_thread_info *info, return -EOPNOTSUPP; } +int mdt_reint_internal(struct mdt_thread_info *info, + struct ptlrpc_request *req, + int offset, + struct lustre_handle *lockh) +{ + struct mdt_reint_record *rec; /* 116 bytes on the stack? no sir! */ + int rc; + + OBD_ALLOC(rec, sizeof(*rec)); + if (rec == NULL) + RETURN(-ENOMEM); + + rc = mdt_reint_unpack(info, req, offset, rec); + if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) { + CERROR("invalid record\n"); + GOTO(out, rc = -EINVAL); + } + + rc = mdt_reint_rec(info, rec, offset, req, lockh); +out: + OBD_FREE(rec, sizeof(*rec)); + RETURN(rc); +} + static int mdt_reint(struct mdt_thread_info *info, struct ptlrpc_request *req, int offset) { @@ -255,7 +282,6 @@ static int mdt_reint(struct mdt_thread_info *info, int size[] = { sizeof(struct mdt_body), sizeof(struct lov_mds_md), /*FIXME*/ sizeof(struct llog_cookie)}; int bufcount,rc; - struct mdt_reint_record *rec; /* 116 bytes on the stack? no sir! */ ENTRY; @@ -279,20 +305,7 @@ static int mdt_reint(struct mdt_thread_info *info, rc = lustre_pack_reply(req, bufcount, size, NULL); if (rc) RETURN (rc); - - OBD_ALLOC(rec, sizeof(*rec)); - if (rec == NULL) - RETURN(-ENOMEM); - - rc = mdt_reint_unpack(info, req, offset, rec); - if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) { - CERROR("invalid record\n"); - GOTO(out, req->rq_status = -EINVAL); - } - - rc = mdt_reint_rec(info, rec, offset, req, NULL); -out: - OBD_FREE(rec, sizeof(*rec)); + rc = mdt_reint_internal(info, req, offset, NULL); RETURN(rc); } @@ -958,12 +971,249 @@ static int mdt_handle(struct ptlrpc_request *req) RETURN(result); } +/*Please move these function from mds to mdt*/ +int intent_disposition(struct ldlm_reply *rep, int flag) +{ + if (!rep) + return 0; + return (rep->lock_policy_res1 & flag); +} + +void intent_set_disposition(struct ldlm_reply *rep, int flag) +{ + if (!rep) + return; + rep->lock_policy_res1 |= flag; +} + +static void fixup_handle_for_resent_req(struct mdt_thread_info *info, + struct ptlrpc_request *req, + int offset, + struct ldlm_lock *new_lock, + struct ldlm_lock **old_lock, + struct lustre_handle *lockh) +{ + struct obd_export *exp = req->rq_export; + struct mdt_device * mdt = info->mti_mdt; + struct ldlm_request *dlmreq = + lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*dlmreq)); + struct lustre_handle remote_hdl = dlmreq->lock_handle1; + struct list_head *iter; + + if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT)) + return; + + l_lock(&mdt->mdt_namespace->ns_lock); + list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) { + struct ldlm_lock *lock; + lock = list_entry(iter, struct ldlm_lock, l_export_chain); + if (lock == new_lock) + continue; + if (lock->l_remote_handle.cookie == remote_hdl.cookie) { + lockh->cookie = lock->l_handle.h_cookie; + LDLM_DEBUG(lock, "restoring lock cookie"); + DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64, + lockh->cookie); + if (old_lock) + *old_lock = LDLM_LOCK_GET(lock); + l_unlock(&mdt->mdt_namespace->ns_lock); + return; + } + } + l_unlock(&mdt->mdt_namespace->ns_lock); + + /* If the xid matches, then we know this is a resent request, + * and allow it. (It's probably an OPEN, for which we don't + * send a lock */ + if (req->rq_xid == + le64_to_cpu(exp->exp_mds_data.med_mcd->mcd_last_xid)) + return; + + /* This remote handle isn't enqueued, so we never received or + * processed this request. Clear MSG_RESENT, because it can + * be handled like any normal request now. */ + + lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT); + + DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64, + remote_hdl.cookie); +} + static int mdt_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp, void *req_cookie, ldlm_mode_t mode, int flags, void *data) { + struct ptlrpc_request *req = req_cookie; + struct ldlm_lock *lock = *lockp; + struct ldlm_intent *it; + struct ldlm_reply *rep; + struct lustre_handle lockh = { 0 }; + struct ldlm_lock *new_lock = NULL; + int getattr_part = MDS_INODELOCK_UPDATE; + int repsize[4] = {sizeof(*rep), + sizeof(struct mdt_body), + sizeof(struct lov_mds_md)};/*FIXME:See mds*/ + int repbufcnt = 3, offset = MDS_REQ_INTENT_REC_OFF; + int rc; + struct mdt_thread_info *info = NULL; + /*FIXME:How to get this pointer? + from the request? or passed in by @data*/ + + ENTRY; - RETURN(ELDLM_LOCK_ABORTED); + + LASSERT(req != NULL); + + if (req->rq_reqmsg->bufcount <= MDS_REQ_INTENT_IT_OFF) { + /* No intent was provided */ + int size = sizeof(struct ldlm_reply); + rc = lustre_pack_reply(req, 1, &size, NULL); + LASSERT(rc == 0); + RETURN(0); + } + + it = lustre_swab_reqbuf(req, MDS_REQ_INTENT_IT_OFF, sizeof(*it), + lustre_swab_ldlm_intent); + if (it == NULL) { + CERROR("Intent missing\n"); + RETURN(req->rq_status = -EFAULT); + } + + LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc)); + + if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) && + (it->opc & (IT_OPEN | IT_GETATTR | IT_LOOKUP))) + /* we should never allow OBD_CONNECT_ACL if not configured */ + repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE; + else if (it->opc & IT_UNLINK) + repsize[repbufcnt++] = sizeof(struct llog_cookie); /*FIXME:See mds*/ + + rc = lustre_pack_reply(req, repbufcnt, repsize, NULL); + if (rc) + RETURN(req->rq_status = rc); + + rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep)); + intent_set_disposition(rep, DISP_IT_EXECD); + + + /* execute policy */ + switch ((long)it->opc) { + case IT_OPEN: + case IT_CREAT|IT_OPEN: + fixup_handle_for_resent_req(info,req, MDS_REQ_INTENT_LOCKREQ_OFF, + lock, NULL, &lockh); + /* XXX swab here to assert that an mds_open reint + * packet is following */ + rep->lock_policy_res2 = mdt_reint_internal(info, req, + offset, &lockh); +#if 0 + /* We abort the lock if the lookup was negative and + * we did not make it to the OPEN portion */ + if (!intent_disposition(rep, DISP_LOOKUP_EXECD)) + RETURN(ELDLM_LOCK_ABORTED); + if (intent_disposition(rep, DISP_LOOKUP_NEG) && + !intent_disposition(rep, DISP_OPEN_OPEN)) + RETURN(ELDLM_LOCK_ABORTED); +#endif + break; + case IT_LOOKUP: + getattr_part = MDS_INODELOCK_LOOKUP; + case IT_GETATTR: + getattr_part |= MDS_INODELOCK_LOOKUP; + case IT_READDIR: +#if 0 + fixup_handle_for_resent_req(info, req, MDS_REQ_INTENT_LOCKREQ_OFF, + lock, &new_lock, &lockh); + + /* INODEBITS_INTEROP: if this lock was converted from a + * plain lock (client does not support inodebits), then + * child lock must be taken with both lookup and update + * bits set for all operations. + */ + if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS)) + getattr_part = MDS_INODELOCK_LOOKUP | + MDS_INODELOCK_UPDATE; + + rep->lock_policy_res2 = mds_getattr_name(offset, req, + getattr_part, &lockh); + /* FIXME: LDLM can set req->rq_status. MDS sets + policy_res{1,2} with disposition and status. + - replay: returns 0 & req->status is old status + - otherwise: returns req->status */ + if (intent_disposition(rep, DISP_LOOKUP_NEG)) + rep->lock_policy_res2 = 0; + if (!intent_disposition(rep, DISP_LOOKUP_POS) || + rep->lock_policy_res2) + RETURN(ELDLM_LOCK_ABORTED); + if (req->rq_status != 0) { + LBUG(); + rep->lock_policy_res2 = req->rq_status; + RETURN(ELDLM_LOCK_ABORTED); + } +#endif + RETURN(ELDLM_LOCK_ABORTED); + break; + default: + CERROR("Unhandled intent "LPD64"\n", it->opc); + LBUG(); + } + + /* By this point, whatever function we called above must have either + * filled in 'lockh', been an intent replay, or returned an error. We + * want to allow replayed RPCs to not get a lock, since we would just + * drop it below anyways because lock replay is done separately by the + * client afterwards. For regular RPCs we want to give the new lock to + * the client instead of whatever lock it was about to get. */ + if (new_lock == NULL) + new_lock = ldlm_handle2lock(&lockh); + if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY)) + RETURN(0); + + LASSERTF(new_lock != NULL, "op "LPX64" lockh "LPX64"\n", + it->opc, lockh.cookie); + + /* If we've already given this lock to a client once, then we should + * have no readers or writers. Otherwise, we should have one reader + * _or_ writer ref (which will be zeroed below) before returning the + * lock to a client. */ + if (new_lock->l_export == req->rq_export) { + LASSERT(new_lock->l_readers + new_lock->l_writers == 0); + } else { + LASSERT(new_lock->l_export == NULL); + LASSERT(new_lock->l_readers + new_lock->l_writers == 1); + } + + *lockp = new_lock; + + if (new_lock->l_export == req->rq_export) { + /* Already gave this to the client, which means that we + * reconstructed a reply. */ + LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & + MSG_RESENT); + RETURN(ELDLM_LOCK_REPLACED); + } + + /* Fixup the lock to be given to the client */ + l_lock(&new_lock->l_resource->lr_namespace->ns_lock); + new_lock->l_readers = 0; + new_lock->l_writers = 0; + + new_lock->l_export = class_export_get(req->rq_export); + list_add(&new_lock->l_export_chain, + &new_lock->l_export->exp_ldlm_data.led_held_locks); + + new_lock->l_blocking_ast = lock->l_blocking_ast; + new_lock->l_completion_ast = lock->l_completion_ast; + + memcpy(&new_lock->l_remote_handle, &lock->l_remote_handle, + sizeof(lock->l_remote_handle)); + + new_lock->l_flags &= ~LDLM_FL_LOCAL; + + LDLM_LOCK_PUT(new_lock); + l_unlock(&new_lock->l_resource->lr_namespace->ns_lock); + + RETURN(ELDLM_LOCK_REPLACED); } static int mdt_config(struct lu_context *ctx, struct mdt_device *m, -- 1.8.3.1