Whamcloud - gitweb
more intent lock implementation.
authorhuanghua <huanghua>
Thu, 1 Jun 2006 09:50:45 +0000 (09:50 +0000)
committerhuanghua <huanghua>
Thu, 1 Jun 2006 09:50:45 +0000 (09:50 +0000)
lustre/mdt/mdt_handler.c

index 411a87c..c2e772a 100644 (file)
@@ -164,42 +164,105 @@ static int mdt_getattr(struct mdt_thread_info *info)
         RETURN(result);
 }
 
-static int mdt_getattr_name(struct mdt_thread_info *info)
+/* @ Huang Hua
+ * UPDATE lock should be taken against parent, and be release before exit;
+ * child_bits lock should be taken against child, and be returned back:
+ *            (1)normal request will release child lock;
+ *            (2)intent request will grant it to client.
+ */
+static int mdt_getattr_name_internal(struct mdt_thread_info *info, 
+                                     int child_bits)
 {
-        struct md_object  *next = mdt_object_child(info->mti_object);
+        struct mdt_object *parent = info->mti_object;
         struct mdt_object *child;
+        struct md_object  *next = mdt_object_child(info->mti_object);
         struct mdt_body   *body;
         const char *name;
         int result;
+        struct mdt_lock_handle *lhp;
+        struct mdt_lock_handle *lhc;
+        struct ldlm_reply *ldlm_rep;
+
+        ENTRY;
+
+        lhp = &info->mti_lh[MDT_LH_PARENT];
+        lhp->mlh_mode = LCK_CR;
+        lhc = &info->mti_lh[MDT_LH_CHILD];
+        lhc->mlh_mode = LCK_CR;
+        ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
 
         LASSERT(info->mti_object != NULL);
 
-        ENTRY;
+        intent_set_disposition(ldlm_rep, DISP_IT_EXECD);
 
         body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
         name = req_capsule_client_get(&info->mti_pill, &RMF_NAME);
         if (name == NULL)
                 RETURN(-EFAULT);
 
+        /*step 1: lock parent */
+        result = mdt_object_lock(info->mti_mdt->mdt_namespace, parent, 
+                                 lhp, MDS_INODELOCK_UPDATE);
+        if (result != 0)
+                RETURN(result);
+        
+        /*step 2: lookup child's fid by name */
         result = mdo_lookup(info->mti_ctxt, next, name, &body->fid1);
-        if (result == 0) {
-                child = mdt_object_find(info->mti_ctxt, info->mti_mdt,
-                                        &body->fid1);
-                if (!IS_ERR(child)) {
-                        result = mo_attr_get(info->mti_ctxt, next,
-                                             &info->mti_attr);
-                        if (result == 0) {
-                                mdt_pack_attr2body(body, &info->mti_attr);
-                                body->valid |= OBD_MD_FLID;
-                        }
-                        mdt_object_put(info->mti_ctxt, child);
-                } else
-                        result = PTR_ERR(child);
+        if (result != 0) 
+                GOTO(out_parent, result);
+
+        /*step 3: find the child object by fid */
+        child = mdt_object_find(info->mti_ctxt, info->mti_mdt, &body->fid1);
+        if (IS_ERR(child))
+                GOTO(out_parent, result = PTR_ERR(child));
+
+        if (!lu_object_exists(info->mti_ctxt, &child->mot_obj.mo_lu)) {
+                intent_set_disposition(ldlm_rep, DISP_LOOKUP_NEG);
+                GOTO(out_child, result = -ENOENT);
+        } else {
+                intent_set_disposition(ldlm_rep, DISP_LOOKUP_POS);
         }
+        /*step 4: lock child: this lock is returned back to caller
+         *                    if successfully get attr. 
+         */
+        result = mdt_object_lock(info->mti_mdt->mdt_namespace, child, 
+                                 lhc, child_bits);
+        if (result != 0)
+                GOTO(out_child, result);
+
+        /* finally, we can get attr for child. */
+        result = mo_attr_get(info->mti_ctxt, next, &info->mti_attr);
+        if (result == 0) {
+                mdt_pack_attr2body(body, &info->mti_attr);
+                body->fid1 = *mdt_object_fid(child);
+                body->valid |= OBD_MD_FLID;
+        } else
+                mdt_object_unlock(info->mti_mdt->mdt_namespace, child, lhc);
 
+out_child:
+        mdt_object_put(info->mti_ctxt, child);
+out_parent:
+        mdt_object_unlock(info->mti_mdt->mdt_namespace, parent, lhp);
         RETURN(result);
 }
 
+/* see the above function */
+static int mdt_getattr_name(struct mdt_thread_info *info)
+{
+        struct mdt_lock_handle *lhc;
+        int rc;
+
+        ENTRY;
+        lhc = &info->mti_lh[MDT_LH_CHILD];
+        lhc->mlh_mode = LCK_CR;
+
+        rc = mdt_getattr_name_internal(info, MDS_INODELOCK_UPDATE);
+        if (rc == 0 && lustre_handle_is_used(&lhc->mlh_lh))
+                       ldlm_lock_decref(&lhc->mlh_lh, lhc->mlh_mode);
+        RETURN(rc);
+}
+
 static struct lu_device_operations mdt_lu_ops;
 
 static int lu_device_is_mdt(struct lu_device *d)
@@ -1114,15 +1177,21 @@ enum mdt_it_code {
 };
 
 static int mdt_intent_getattr(enum mdt_it_code opcode,
-                              struct mdt_thread_info *info);
+                              struct mdt_thread_info *info,
+                              struct ldlm_lock **,
+                              int);
 static int mdt_intent_reint(enum mdt_it_code opcode,
-                            struct mdt_thread_info *info);
+                            struct mdt_thread_info *info,
+                            struct ldlm_lock **,
+                            int);
 
 static struct mdt_it_flavor {
         const struct req_format *it_fmt;
         __u32                    it_flags;
         int                    (*it_act)(enum mdt_it_code ,
-                                         struct mdt_thread_info *);
+                                         struct mdt_thread_info *,
+                                         struct ldlm_lock **,
+                                         int);
         long                     it_reint;
 } mdt_it_flavor[] = {
         [MDT_IT_OPEN]     = {
@@ -1177,50 +1246,104 @@ static struct mdt_it_flavor {
 };
 
 static int mdt_intent_getattr(enum mdt_it_code opcode,
-                              struct mdt_thread_info *info)
+                              struct mdt_thread_info *info,
+                              struct ldlm_lock **lockp,
+                              int flags)
 {
-        int getattr_part = MDS_INODELOCK_UPDATE;
+        int child_bits;
+        struct ldlm_lock *old_lock = *lockp;
+        struct ldlm_lock *new_lock = NULL;
+        struct mdt_lock_handle *lhc;
+        struct ptlrpc_request *req = mdt_info_req(info);
+        struct ldlm_reply *ldlm_rep;
+
+        ENTRY;
         
         switch (opcode) {
         case MDT_IT_LOOKUP:
-                getattr_part = MDS_INODELOCK_LOOKUP;
+                child_bits = MDS_INODELOCK_LOOKUP;
                 break;
         case MDT_IT_GETATTR:
-                getattr_part |= MDS_INODELOCK_LOOKUP;
+                child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
                 break;
         default:
+                CERROR("Unhandled till now");
+                RETURN(-EINVAL);
                 break;
         }
-        /*FIXME do something on the above flag */
+        
+        ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
+        intent_set_disposition(ldlm_rep, DISP_LOOKUP_EXECD);
+        /* lock on child object is returned @info->mti_lh[MDT_LH_CHILD] */
+        ldlm_rep->lock_policy_res2 = mdt_getattr_name_internal(info, child_bits);
+
+        if (intent_disposition(ldlm_rep, DISP_LOOKUP_NEG))
+                ldlm_rep->lock_policy_res2 = 0;
+        if (!intent_disposition(ldlm_rep, DISP_LOOKUP_POS) ||
+                        ldlm_rep->lock_policy_res2) {
+                RETURN(ELDLM_LOCK_ABORTED);
+        }
+
+        lhc = &info->mti_lh[MDT_LH_CHILD];
+        new_lock = ldlm_handle2lock(&lhc->mlh_lh);
+        if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
+                RETURN(0);
+
+        *lockp = new_lock;
+
+        /* Fixup the lock to be given to the client */
+        l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
+        new_lock->l_readers = 0;
+        new_lock->l_writers = 0;
+
+        new_lock->l_export = class_export_get(req->rq_export);
+        list_add(&new_lock->l_export_chain,
+                 &new_lock->l_export->exp_ldlm_data.led_held_locks);
+
+        new_lock->l_blocking_ast = old_lock->l_blocking_ast;
+        new_lock->l_completion_ast = old_lock->l_completion_ast;
 
-        return mdt_getattr_name(info);
+        new_lock->l_remote_handle = old_lock->l_remote_handle;
+
+        new_lock->l_flags &= ~LDLM_FL_LOCAL;
+
+        LDLM_LOCK_PUT(old_lock);
+        l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
+
+        RETURN(ELDLM_LOCK_REPLACED);
 }
 
 static int mdt_intent_reint(enum mdt_it_code opcode,
-                            struct mdt_thread_info *info)
+                            struct mdt_thread_info *info,
+                            struct ldlm_lock **lockp,
+                            int flags)
 {
         long opc;
-        int  rc;
-
+        struct ldlm_reply *rep;
+        
         static const struct req_format *intent_fmts[REINT_MAX] = {
                 [REINT_CREATE]  = &RQF_LDLM_INTENT_CREATE,
                 [REINT_OPEN]    = &RQF_LDLM_INTENT_OPEN
         };
 
         ENTRY;
+        rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
+        if (rep == NULL) 
+                RETURN(-EFAULT);
 
         opc = mdt_reint_opcode(info, intent_fmts);
-        if (opc >= 0) {
-                if (mdt_it_flavor[opcode].it_reint == opc)
-                        rc = mdt_reint_internal(info, opc);
-                else {
-                        CERROR("Reint code %ld doesn't match intent: %d\n",
-                               opc, opcode);
-                        rc = -EPROTO;
-                }
-        } else
-                rc = opc;
-        RETURN(rc);
+        if (opc < 0) 
+                RETURN(opc);
+
+        if (mdt_it_flavor[opcode].it_reint != opc) {
+                CERROR("Reint code %ld doesn't match intent: %d\n",
+                       opc, opcode);
+                RETURN(-EPROTO);
+        }
+        rep->lock_policy_res2 = mdt_reint_internal(info, opc);
+        intent_set_disposition(rep, DISP_IT_EXECD);
+
+        RETURN(ELDLM_LOCK_ABORTED);
 }
 
 static int mdt_intent_code(long itcode)
@@ -1263,7 +1386,8 @@ static int mdt_intent_code(long itcode)
         return result;
 }
 
-static int mdt_intent_opc(long itopc, struct mdt_thread_info *info)
+static int mdt_intent_opc(long itopc, struct mdt_thread_info *info,
+                          struct ldlm_lock **lockp, int flags)
 {
         int opc;
         int rc;
@@ -1274,34 +1398,26 @@ static int mdt_intent_opc(long itopc, struct mdt_thread_info *info)
         ENTRY;
 
         opc = mdt_intent_code(itopc);
-        if (opc >= 0) {
-                pill = &info->mti_pill;
-                flv  = &mdt_it_flavor[opc];
-
-                if (flv->it_fmt != NULL)
-                        req_capsule_extend(pill, flv->it_fmt);
-
-                rc = mdt_unpack_req_pack_rep(info, flv->it_flags);
-                if (rc == 0) {
-                        struct ldlm_reply *rep;
-
-                        rep = req_capsule_server_get(pill, &RMF_DLM_REP);
-                        if (rep != NULL) {
-                                /* execute policy */
-                                /*XXX LASSERT( flv->it_act) */
-                                if (flv->it_act) {
-                                        rc = flv->it_act(opc, info);
-                                        rep->lock_policy_res2 = rc;
-                                        intent_set_disposition(rep, 
-                                                               DISP_IT_EXECD);
-                                        rc = 0;
-                                } else
-                                        rc = -EOPNOTSUPP;
-                        } else
-                                rc = -EFAULT;
-                }
-        } else
-                rc = -EINVAL;
+        if (opc < 0)
+                RETURN(-EINVAL);
+        
+        pill = &info->mti_pill;
+        flv  = &mdt_it_flavor[opc];
+
+        if (flv->it_fmt != NULL)
+                req_capsule_extend(pill, flv->it_fmt);
+
+        rc = mdt_unpack_req_pack_rep(info, flv->it_flags);
+        if (rc)
+                RETURN(rc);
+
+
+         /* execute policy */
+         /*XXX LASSERT( flv->it_act) */
+         if (flv->it_act) {
+                 rc = flv->it_act(opc, info, lockp, flags);
+         } else
+                 rc = -EOPNOTSUPP;
 
         RETURN(rc);
 }
@@ -1333,7 +1449,7 @@ static int mdt_intent_policy(struct ldlm_namespace *ns,
                         LDLM_DEBUG(lock, "intent policy opc: %s",
                                    ldlm_it2str(it->opc));
 
-                        rc = mdt_intent_opc(it->opc, info);
+                        rc = mdt_intent_opc(it->opc, info, lockp, flags);
                         if (rc == 0)
                                 rc = ELDLM_OK;
                 } else