Whamcloud - gitweb
add intent support intially, and much work are still needed.
authorhuanghua <huanghua>
Thu, 27 Apr 2006 06:58:21 +0000 (06:58 +0000)
committerhuanghua <huanghua>
Thu, 27 Apr 2006 06:58:21 +0000 (06:58 +0000)
lustre/mdt/mdt_handler.c

index 07c94ad..0ce79af 100644 (file)
@@ -53,6 +53,9 @@
 /* lu2dt_dev() */
 #include <linux/dt_object.h>
 
+/*LUSTRE_POSIX_ACL_MAX_SIZE*/
+#include <linux/lustre_acl.h>
+
 
 /* struct mds_client_data */
 #include "../mds/mds_internal.h"
@@ -246,6 +249,30 @@ static int mdt_readpage(struct mdt_thread_info *info,
         return -EOPNOTSUPP;
 }
 
+int mdt_reint_internal(struct mdt_thread_info *info,
+                       struct ptlrpc_request *req, 
+                       int offset,
+                       struct lustre_handle *lockh)
+{
+        struct mdt_reint_record *rec; /* 116 bytes on the stack?  no sir! */
+        int rc;
+
+        OBD_ALLOC(rec, sizeof(*rec));
+        if (rec == NULL)
+                RETURN(-ENOMEM);
+
+        rc = mdt_reint_unpack(info, req, offset, rec);
+        if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
+                CERROR("invalid record\n");
+                GOTO(out, rc = -EINVAL);
+        }
+
+        rc = mdt_reint_rec(info, rec, offset, req, lockh);
+out:
+        OBD_FREE(rec, sizeof(*rec));
+        RETURN(rc);
+}
+
 static int mdt_reint(struct mdt_thread_info *info,
                      struct ptlrpc_request *req, int offset)
 {
@@ -255,7 +282,6 @@ static int mdt_reint(struct mdt_thread_info *info,
         int size[] = { sizeof(struct mdt_body), sizeof(struct lov_mds_md), /*FIXME*/
                        sizeof(struct llog_cookie)};
         int bufcount,rc;
-        struct mdt_reint_record *rec; /* 116 bytes on the stack?  no sir! */
 
         ENTRY;
 
@@ -279,20 +305,7 @@ static int mdt_reint(struct mdt_thread_info *info,
         rc = lustre_pack_reply(req, bufcount, size, NULL);
         if (rc)
                 RETURN (rc);
-
-        OBD_ALLOC(rec, sizeof(*rec));
-        if (rec == NULL)
-                RETURN(-ENOMEM);
-
-        rc = mdt_reint_unpack(info, req, offset, rec);
-        if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
-                CERROR("invalid record\n");
-                GOTO(out, req->rq_status = -EINVAL);
-        }
-
-        rc = mdt_reint_rec(info, rec, offset, req, NULL);
-out:
-        OBD_FREE(rec, sizeof(*rec));
+        rc = mdt_reint_internal(info, req, offset, NULL);
         RETURN(rc);
 }
 
@@ -958,12 +971,249 @@ static int mdt_handle(struct ptlrpc_request *req)
         RETURN(result);
 }
 
+/*Please move these function from mds to mdt*/
+int intent_disposition(struct ldlm_reply *rep, int flag)
+{
+        if (!rep)
+                return 0;
+        return (rep->lock_policy_res1 & flag);
+}
+
+void intent_set_disposition(struct ldlm_reply *rep, int flag)
+{
+        if (!rep)
+                return;
+        rep->lock_policy_res1 |= flag;
+}
+
+static void fixup_handle_for_resent_req(struct mdt_thread_info *info,
+                                        struct ptlrpc_request *req, 
+                                        int offset,
+                                        struct ldlm_lock *new_lock,
+                                        struct ldlm_lock **old_lock,
+                                        struct lustre_handle *lockh)
+{
+        struct obd_export *exp = req->rq_export;
+        struct mdt_device * mdt = info->mti_mdt;
+        struct ldlm_request *dlmreq =
+                lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*dlmreq));
+        struct lustre_handle remote_hdl = dlmreq->lock_handle1;
+        struct list_head *iter;
+
+        if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
+                return;
+
+        l_lock(&mdt->mdt_namespace->ns_lock);
+        list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
+                struct ldlm_lock *lock;
+                lock = list_entry(iter, struct ldlm_lock, l_export_chain);
+                if (lock == new_lock)
+                        continue;
+                if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
+                        lockh->cookie = lock->l_handle.h_cookie;
+                        LDLM_DEBUG(lock, "restoring lock cookie");
+                        DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
+                                  lockh->cookie);
+                        if (old_lock)
+                                *old_lock = LDLM_LOCK_GET(lock);
+                        l_unlock(&mdt->mdt_namespace->ns_lock);
+                        return;
+                }
+        }
+        l_unlock(&mdt->mdt_namespace->ns_lock);
+
+        /* If the xid matches, then we know this is a resent request,
+         * and allow it. (It's probably an OPEN, for which we don't
+         * send a lock */
+        if (req->rq_xid ==
+            le64_to_cpu(exp->exp_mds_data.med_mcd->mcd_last_xid))
+                return;
+
+        /* This remote handle isn't enqueued, so we never received or
+         * processed this request.  Clear MSG_RESENT, because it can
+         * be handled like any normal request now. */
+
+        lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
+
+        DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
+                  remote_hdl.cookie);
+}
+
 static int mdt_intent_policy(struct ldlm_namespace *ns,
                              struct ldlm_lock **lockp, void *req_cookie,
                              ldlm_mode_t mode, int flags, void *data)
 {
+        struct ptlrpc_request *req = req_cookie;
+        struct ldlm_lock *lock = *lockp;
+        struct ldlm_intent *it;
+        struct ldlm_reply *rep;
+        struct lustre_handle lockh = { 0 };
+        struct ldlm_lock *new_lock = NULL;
+        int getattr_part = MDS_INODELOCK_UPDATE;
+        int repsize[4] = {sizeof(*rep),
+                          sizeof(struct mdt_body),
+                          sizeof(struct lov_mds_md)};/*FIXME:See mds*/
+        int repbufcnt = 3, offset = MDS_REQ_INTENT_REC_OFF;
+        int rc;
+        struct mdt_thread_info *info = NULL;
+        /*FIXME:How to get this pointer?
+                from the request? or passed in by @data*/ 
+
+
         ENTRY;
-        RETURN(ELDLM_LOCK_ABORTED);
+
+        LASSERT(req != NULL);
+
+        if (req->rq_reqmsg->bufcount <= MDS_REQ_INTENT_IT_OFF) {
+                /* No intent was provided */
+                int size = sizeof(struct ldlm_reply);
+                rc = lustre_pack_reply(req, 1, &size, NULL);
+                LASSERT(rc == 0);
+                RETURN(0);
+        }
+
+        it = lustre_swab_reqbuf(req, MDS_REQ_INTENT_IT_OFF, sizeof(*it),
+                                lustre_swab_ldlm_intent);
+        if (it == NULL) {
+                CERROR("Intent missing\n");
+                RETURN(req->rq_status = -EFAULT);
+        }
+
+        LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc));
+
+        if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
+            (it->opc & (IT_OPEN | IT_GETATTR | IT_LOOKUP)))
+                /* we should never allow OBD_CONNECT_ACL if not configured */
+                repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
+        else if (it->opc & IT_UNLINK)
+                repsize[repbufcnt++] = sizeof(struct llog_cookie); /*FIXME:See mds*/
+
+        rc = lustre_pack_reply(req, repbufcnt, repsize, NULL);
+        if (rc)
+                RETURN(req->rq_status = rc);
+
+        rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
+        intent_set_disposition(rep, DISP_IT_EXECD);
+
+
+        /* execute policy */
+        switch ((long)it->opc) {
+        case IT_OPEN:
+        case IT_CREAT|IT_OPEN:
+                fixup_handle_for_resent_req(info,req, MDS_REQ_INTENT_LOCKREQ_OFF,
+                                            lock, NULL, &lockh);
+                /* XXX swab here to assert that an mds_open reint
+                 * packet is following */
+                rep->lock_policy_res2 = mdt_reint_internal(info, req,
+                                                offset, &lockh);
+#if 0
+                /* We abort the lock if the lookup was negative and
+                 * we did not make it to the OPEN portion */
+                if (!intent_disposition(rep, DISP_LOOKUP_EXECD))
+                        RETURN(ELDLM_LOCK_ABORTED);
+                if (intent_disposition(rep, DISP_LOOKUP_NEG) &&
+                    !intent_disposition(rep, DISP_OPEN_OPEN))
+                        RETURN(ELDLM_LOCK_ABORTED);
+#endif
+                break;
+        case IT_LOOKUP:
+                        getattr_part = MDS_INODELOCK_LOOKUP;
+        case IT_GETATTR:
+                        getattr_part |= MDS_INODELOCK_LOOKUP;
+        case IT_READDIR:
+#if 0
+                fixup_handle_for_resent_req(info, req, MDS_REQ_INTENT_LOCKREQ_OFF,
+                                            lock, &new_lock, &lockh);
+
+                /* INODEBITS_INTEROP: if this lock was converted from a
+                 * plain lock (client does not support inodebits), then
+                 * child lock must be taken with both lookup and update
+                 * bits set for all operations.
+                 */
+                if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS))
+                        getattr_part = MDS_INODELOCK_LOOKUP |
+                                       MDS_INODELOCK_UPDATE;
+
+                rep->lock_policy_res2 = mds_getattr_name(offset, req,
+                                                         getattr_part, &lockh);
+                /* FIXME: LDLM can set req->rq_status. MDS sets
+                   policy_res{1,2} with disposition and status.
+                   - replay: returns 0 & req->status is old status
+                   - otherwise: returns req->status */
+                if (intent_disposition(rep, DISP_LOOKUP_NEG))
+                        rep->lock_policy_res2 = 0;
+                if (!intent_disposition(rep, DISP_LOOKUP_POS) ||
+                    rep->lock_policy_res2)
+                        RETURN(ELDLM_LOCK_ABORTED);
+                if (req->rq_status != 0) {
+                        LBUG();
+                        rep->lock_policy_res2 = req->rq_status;
+                        RETURN(ELDLM_LOCK_ABORTED);
+                }
+#endif
+                RETURN(ELDLM_LOCK_ABORTED);
+                break;
+        default:
+                CERROR("Unhandled intent "LPD64"\n", it->opc);
+                LBUG();
+        }
+
+        /* By this point, whatever function we called above must have either
+         * filled in 'lockh', been an intent replay, or returned an error.  We
+         * want to allow replayed RPCs to not get a lock, since we would just
+         * drop it below anyways because lock replay is done separately by the
+         * client afterwards.  For regular RPCs we want to give the new lock to
+         * the client instead of whatever lock it was about to get. */
+        if (new_lock == NULL)
+                new_lock = ldlm_handle2lock(&lockh);
+        if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
+                RETURN(0);
+
+        LASSERTF(new_lock != NULL, "op "LPX64" lockh "LPX64"\n",
+                 it->opc, lockh.cookie);
+
+        /* If we've already given this lock to a client once, then we should
+         * have no readers or writers.  Otherwise, we should have one reader
+         * _or_ writer ref (which will be zeroed below) before returning the
+         * lock to a client. */
+        if (new_lock->l_export == req->rq_export) {
+                LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
+        } else {
+                LASSERT(new_lock->l_export == NULL);
+                LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
+        }
+
+        *lockp = new_lock;
+
+        if (new_lock->l_export == req->rq_export) {
+                /* Already gave this to the client, which means that we
+                 * reconstructed a reply. */
+                LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
+                        MSG_RESENT);
+                RETURN(ELDLM_LOCK_REPLACED);
+        }
+
+        /* Fixup the lock to be given to the client */
+        l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
+        new_lock->l_readers = 0;
+        new_lock->l_writers = 0;
+
+        new_lock->l_export = class_export_get(req->rq_export);
+        list_add(&new_lock->l_export_chain,
+                 &new_lock->l_export->exp_ldlm_data.led_held_locks);
+
+        new_lock->l_blocking_ast = lock->l_blocking_ast;
+        new_lock->l_completion_ast = lock->l_completion_ast;
+
+        memcpy(&new_lock->l_remote_handle, &lock->l_remote_handle,
+               sizeof(lock->l_remote_handle));
+
+        new_lock->l_flags &= ~LDLM_FL_LOCAL;
+
+        LDLM_LOCK_PUT(new_lock);
+        l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
+
+        RETURN(ELDLM_LOCK_REPLACED);
 }
 
 static int mdt_config(struct lu_context *ctx, struct mdt_device *m,