Whamcloud - gitweb
Land b1_6_elc onto b1_6 (20070621_0218)
[fs/lustre-release.git] / lustre / mdc / mdc_locks.c
index c2937bb..2380297 100644 (file)
@@ -229,48 +229,46 @@ static void mdc_realloc_openmsg(struct ptlrpc_request *req,
 
 /* We always reserve enough space in the reply packet for a stripe MD, because
  * we don't know in advance the file type. */
-int mdc_enqueue(struct obd_export *exp,
-                int lock_type,
-                struct lookup_intent *it,
-                int lock_mode,
-                struct mdc_op_data *data,
-                struct lustre_handle *lockh,
-                void *lmm,
-                int lmmsize,
-                ldlm_completion_callback cb_completion,
-                ldlm_blocking_callback cb_blocking,
-                void *cb_data, int extra_lock_flags)
+int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo,
+                struct lookup_intent *it, struct mdc_op_data *op_data,
+                struct lustre_handle *lockh, void *lmm, int lmmsize,
+                int extra_lock_flags)
 {
         struct ptlrpc_request *req;
         struct obd_device *obddev = class_exp2obd(exp);
         struct ldlm_res_id res_id =
-                { .name = {data->fid1.id, data->fid1.generation} };
+                { .name = {op_data->fid1.id, op_data->fid1.generation} };
         ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP } };
         struct ldlm_request *lockreq;
         struct ldlm_intent *lit;
         struct ldlm_reply *lockrep;
         int size[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
                         [DLM_LOCKREQ_OFF]     = sizeof(*lockreq),
-                        [DLM_INTENT_IT_OFF]   = sizeof(*lit) };
+                        [DLM_INTENT_IT_OFF]   = sizeof(*lit),
+                        0, 0, 0, 0 };
         int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
                            [DLM_LOCKREPLY_OFF]   = sizeof(*lockrep),
                            [DLM_REPLY_REC_OFF]   = sizeof(struct mds_body),
                            [DLM_REPLY_REC_OFF+1] = obddev->u.cli.
-                                                   cl_max_mds_easize };
+                                                   cl_max_mds_easize, 0 };
         int flags = extra_lock_flags | LDLM_FL_HAS_INTENT;
         int repbufcnt = 4, rc;
         void *eadata;
         ENTRY;
 
-        LASSERTF(lock_type == LDLM_IBITS, "lock type %d\n", lock_type);
+        LASSERTF(einfo->ei_type == LDLM_IBITS,"lock type %d\n", einfo->ei_type);
 //        LDLM_DEBUG_NOLOCK("mdsintent=%s,name=%s,dir=%lu",
 //                          ldlm_it2str(it->it_op), it_name, it_inode->i_ino);
 
         if (it->it_op & IT_OPEN) {
+                CFS_LIST_HEAD(cancels);
+                int count = 0;
+                int mode;
+                
                 it->it_create_mode |= S_IFREG;
 
                 size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_create);
-                size[DLM_INTENT_REC_OFF + 1] = data->namelen + 1;
+                size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
                 /* As an optimization, we allocate an RPC request buffer for
                  * at least a default-sized LOV EA even if we aren't sending
                  * one.  We grow the whole request to the next power-of-two
@@ -286,27 +284,40 @@ int mdc_enqueue(struct obd_export *exp,
                                  min(size[DLM_INTENT_REC_OFF+2]+round_up(rc)-rc,
                                      obddev->u.cli.cl_max_mds_easize);
 
-                if (it->it_flags & O_JOIN_FILE) {
-                        __u64 head_size = *(__u32*)cb_data;
-                        __u32 tsize = *(__u32*)lmm;
+                /* If inode is known, cancel conflicting OPEN locks. */
+                if (op_data->fid2.id) {
+                        if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
+                                mode = LCK_CW;
+#ifdef FMODE_EXEC
+                        else if (it->it_flags & FMODE_EXEC)
+                                mode = LCK_PR;
+#endif
+                        else 
+                                mode = LCK_CR;
+                        count = mdc_resource_get_unused(exp, &op_data->fid2,
+                                                        &cancels, mode,
+                                                        MDS_INODELOCK_OPEN);
+                }
 
+                /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
+                if (it->it_op & IT_CREAT || it->it_flags & O_JOIN_FILE)
+                        mode = LCK_EX;
+                else
+                        mode = LCK_CR;
+                count += mdc_resource_get_unused(exp, &op_data->fid1, &cancels,
+                                                 mode, MDS_INODELOCK_UPDATE);
+                if (it->it_flags & O_JOIN_FILE) {
                         /* join is like an unlink of the tail */
                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
                         size[DLM_INTENT_REC_OFF + 3] =
                                                  sizeof(struct mds_rec_join);
-                        req = ptlrpc_prep_req(class_exp2cliimp(exp),
-                                              LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
-                                              7, size, NULL);
-                        /* when joining file, cb_data and lmm args together
-                         * indicate the head file size*/
-                        mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, data,
-                                      (head_size << 32) | tsize);
-                        cb_data = NULL;
-                        lmm = NULL;
+                        req = ldlm_prep_enqueue_req(exp, 7, size, &cancels,
+                                                    count);
+                        mdc_join_pack(req, DLM_INTENT_REC_OFF + 3, op_data,
+                                      (*(__u64 *)op_data->data));
                 } else {
-                        req = ptlrpc_prep_req(class_exp2cliimp(exp),
-                                              LUSTRE_DLM_VERSION, LDLM_ENQUEUE,
-                                              6, size, NULL);
+                        req = ldlm_prep_enqueue_req(exp, 6, size, &cancels,
+                                                    count);
                 }
 
                 if (!req)
@@ -322,16 +333,16 @@ int mdc_enqueue(struct obd_export *exp,
                 lit->opc = (__u64)it->it_op;
 
                 /* pack the intended request */
-                mdc_open_pack(req, DLM_INTENT_REC_OFF, data, it->it_create_mode,
-                              0, it->it_flags, lmm, lmmsize);
+                mdc_open_pack(req, DLM_INTENT_REC_OFF, op_data,
+                              it->it_create_mode, 0, it->it_flags,
+                              lmm, lmmsize);
 
                 repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
         } else if (it->it_op & IT_UNLINK) {
                 size[DLM_INTENT_REC_OFF] = sizeof(struct mds_rec_unlink);
-                size[DLM_INTENT_REC_OFF + 1] = data->namelen + 1;
+                size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
-                req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
-                                      LDLM_ENQUEUE, 5, size, NULL);
+                req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
                 if (!req)
                         RETURN(-ENOMEM);
 
@@ -341,7 +352,7 @@ int mdc_enqueue(struct obd_export *exp,
                 lit->opc = (__u64)it->it_op;
 
                 /* pack the intended request */
-                mdc_unlink_pack(req, DLM_INTENT_REC_OFF, data);
+                mdc_unlink_pack(req, DLM_INTENT_REC_OFF, op_data);
 
                 repsize[repbufcnt++] = obddev->u.cli.cl_max_mds_cookiesize;
         } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
@@ -349,13 +360,12 @@ int mdc_enqueue(struct obd_export *exp,
                                   OBD_MD_FLACL | OBD_MD_FLMODEASIZE |
                                   OBD_MD_FLDIREA;
                 size[DLM_INTENT_REC_OFF] = sizeof(struct mds_body);
-                size[DLM_INTENT_REC_OFF + 1] = data->namelen + 1;
+                size[DLM_INTENT_REC_OFF + 1] = op_data->namelen + 1;
 
                 if (it->it_op & IT_GETATTR)
                         policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
 
-                req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
-                                      LDLM_ENQUEUE, 5, size, NULL);
+                req = ldlm_prep_enqueue_req(exp, 5, size, NULL, 0);
                 if (!req)
                         RETURN(-ENOMEM);
 
@@ -366,13 +376,12 @@ int mdc_enqueue(struct obd_export *exp,
 
                 /* pack the intended request */
                 mdc_getattr_pack(req, DLM_INTENT_REC_OFF, valid,
-                                 it->it_flags, data);
+                                 it->it_flags, op_data);
 
                 repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
         } else if (it->it_op == IT_READDIR) {
                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
-                req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
-                                      LDLM_ENQUEUE, 2, size, NULL);
+                req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
                 if (!req)
                         RETURN(-ENOMEM);
 
@@ -390,9 +399,8 @@ int mdc_enqueue(struct obd_export *exp,
           * rpcs in flight counter */
         mdc_get_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
         mdc_enter_request(&obddev->u.cli);
-        rc = ldlm_cli_enqueue(exp, &req, res_id, lock_type, &policy,
-                              lock_mode, &flags, cb_blocking, cb_completion,
-                              NULL, cb_data, NULL, 0, NULL, lockh, 0);
+        rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, &policy, &flags, NULL,
+                              0, NULL, lockh, 0);
         mdc_exit_request(&obddev->u.cli);
         mdc_put_rpc_lock(obddev->u.cli.cl_rpc_lock, it);
 
@@ -407,7 +415,7 @@ int mdc_enqueue(struct obd_export *exp,
         /* This can go when we're sure that this can never happen */
         LASSERT(rc != -ENOENT);
         if (rc == ELDLM_LOCK_ABORTED) {
-                lock_mode = 0;
+                einfo->ei_mode = 0;
                 memset(lockh, 0, sizeof(*lockh));
                 rc = 0;
         } else if (rc != 0) {
@@ -422,10 +430,10 @@ int mdc_enqueue(struct obd_export *exp,
 
                 /* If the server gave us back a different lock mode, we should
                  * fix up our variables. */
-                if (lock->l_req_mode != lock_mode) {
+                if (lock->l_req_mode != einfo->ei_mode) {
                         ldlm_lock_addref(lockh, lock->l_req_mode);
-                        ldlm_lock_decref(lockh, lock_mode);
-                        lock_mode = lock->l_req_mode;
+                        ldlm_lock_decref(lockh, einfo->ei_mode);
+                        einfo->ei_mode = lock->l_req_mode;
                 }
                 LDLM_LOCK_PUT(lock);
         }
@@ -437,7 +445,7 @@ int mdc_enqueue(struct obd_export *exp,
 
         it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1;
         it->d.lustre.it_status = (int)lockrep->lock_policy_res2;
-        it->d.lustre.it_lock_mode = lock_mode;
+        it->d.lustre.it_lock_mode = einfo->ei_mode;
         it->d.lustre.it_data = req;
 
         if (it->d.lustre.it_status < 0 && req->rq_replay)
@@ -624,11 +632,12 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
          * this and use the request from revalidate.  In this case, revalidate
          * never dropped its reference, so the refcounts are all OK */
         if (!it_disposition(it, DISP_ENQ_COMPLETE)) {
+                struct ldlm_enqueue_info einfo =
+                        { LDLM_IBITS, it_to_lock_mode(it), cb_blocking,
+                          ldlm_completion_ast, NULL, NULL };
 
-                rc = mdc_enqueue(exp, LDLM_IBITS, it, it_to_lock_mode(it),
-                                 op_data, &lockh, lmm, lmmsize,
-                                 ldlm_completion_ast, cb_blocking, NULL,
-                                 extra_lock_flags);
+                rc = mdc_enqueue(exp, &einfo, it, op_data, &lockh,
+                                 lmm, lmmsize, extra_lock_flags);
                 if (rc < 0)
                         RETURN(rc);
                 memcpy(&it->d.lustre.it_lock_handle, &lockh, sizeof(lockh));
@@ -687,6 +696,8 @@ int mdc_intent_lock(struct obd_export *exp, struct mdc_op_data *op_data,
             !it_open_error(DISP_OPEN_OPEN, it)) {
                 it_set_disposition(it, DISP_ENQ_OPEN_REF);
                 ptlrpc_request_addref(request); /* balanced in ll_file_open */
+                /* BUG 11546 - eviction in the middle of open rpc processing */
+                OBD_FAIL_TIMEOUT(OBD_FAIL_MDC_ENQUEUE_PAUSE, obd_timeout);
         }
 
         if (it->it_op & IT_CREAT) {