Whamcloud - gitweb
LU-1876 hsm: layout lock implementation on server side
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
index a44945c..4e266be 100644 (file)
@@ -1246,13 +1246,12 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                 LDLM_LOCK_PUT(lock);
                 rc = 0;
         } else {
+               bool try_layout = false;
+
 relock:
                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout*2);
                 mdt_lock_handle_init(lhc);
-                if (child_bits == MDS_INODELOCK_LAYOUT)
-                        mdt_lock_reg_init(lhc, LCK_CR);
-                else
-                        mdt_lock_reg_init(lhc, LCK_PR);
+               mdt_lock_reg_init(lhc, LCK_PR);
 
                 if (mdt_object_exists(child) == 0) {
                         LU_OBJECT_DEBUG(D_INODE, info->mti_env,
@@ -1270,12 +1269,6 @@ relock:
                         if (unlikely(rc != 0))
                                 GOTO(out_child, rc);
 
-                        /* layout lock is used only on regular files */
-                        if ((ma->ma_valid & MA_INODE) &&
-                            (ma->ma_attr.la_valid & LA_MODE) &&
-                            !S_ISREG(ma->ma_attr.la_mode))
-                                child_bits &= ~MDS_INODELOCK_LAYOUT;
-
                         /* If the file has not been changed for some time, we
                          * return not only a LOOKUP lock, but also an UPDATE
                          * lock and this might save us RPC on later STAT. For
@@ -1288,9 +1281,35 @@ relock:
                                 child_bits |= MDS_INODELOCK_UPDATE;
                 }
 
-                rc = mdt_object_lock(info, child, lhc, child_bits,
-                                     MDT_CROSS_LOCK);
+               /* layout lock must be granted in a best-effort way
+                * for IT operations */
+               LASSERT(!(child_bits & MDS_INODELOCK_LAYOUT));
+               if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_GETATTR) &&
+                   exp_connect_layout(info->mti_exp) &&
+                   S_ISREG(lu_object_attr(&child->mot_obj.mo_lu)) &&
+                   ldlm_rep != NULL) {
+                       /* try to grant layout lock for regular file. */
+                       try_layout = true;
+               }
 
+               rc = 0;
+               if (try_layout) {
+                       child_bits |= MDS_INODELOCK_LAYOUT;
+                       /* try layout lock, it may fail to be granted due to
+                        * contention at LOOKUP or UPDATE */
+                       if (!mdt_object_lock_try(info, child, lhc, child_bits,
+                                                MDT_CROSS_LOCK)) {
+                               child_bits &= ~MDS_INODELOCK_LAYOUT;
+                               LASSERT(child_bits != 0);
+                               rc = mdt_object_lock(info, child, lhc,
+                                               child_bits, MDT_CROSS_LOCK);
+                       } else {
+                               ma_need |= MA_LOV;
+                       }
+               } else {
+                       rc = mdt_object_lock(info, child, lhc, child_bits,
+                                               MDT_CROSS_LOCK);
+               }
                 if (unlikely(rc != 0))
                         GOTO(out_child, rc);
         }
@@ -1300,7 +1319,7 @@ relock:
         if (lock &&
             lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_UPDATE &&
             S_ISREG(lu_object_attr(&mdt_object_child(child)->mo_lu)))
-                ma_need = MA_SOM;
+                ma_need |= MA_SOM;
 
         /* finally, we can get attr for child. */
         mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
@@ -2131,6 +2150,7 @@ int mdt_llog_prev_block(struct mdt_thread_info *info)
 /*
  * DLM handlers.
  */
+
 static struct ldlm_callback_suite cbs = {
        .lcs_completion = ldlm_server_completion_ast,
        .lcs_blocking   = ldlm_server_blocking_ast,
@@ -2362,12 +2382,14 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
         RETURN(rc);
 }
 
-int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
-                    struct mdt_lock_handle *lh, __u64 ibits, int locality)
+static int mdt_object_lock0(struct mdt_thread_info *info, struct mdt_object *o,
+                           struct mdt_lock_handle *lh, __u64 ibits,
+                           bool nonblock, int locality)
 {
         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
         ldlm_policy_data_t *policy = &info->mti_policy;
         struct ldlm_res_id *res_id = &info->mti_res_id;
+       __u64 dlmflags;
         int rc;
         ENTRY;
 
@@ -2404,6 +2426,10 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
         memset(policy, 0, sizeof(*policy));
         fid_build_reg_res_name(mdt_object_fid(o), res_id);
 
+       dlmflags = LDLM_FL_ATOMIC_CB;
+       if (nonblock)
+               dlmflags |= LDLM_FL_BLOCK_NOWAIT;
+
         /*
          * Take PDO lock on whole directory and build correct @res_id for lock
          * on part of directory.
@@ -2419,7 +2445,7 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
                          */
                         policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
                         rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode,
-                                          policy, res_id, LDLM_FL_ATOMIC_CB,
+                                          policy, res_id, dlmflags,
                                           &info->mti_exp->exp_handle.h_cookie);
                         if (unlikely(rc))
                                 RETURN(rc);
@@ -2440,7 +2466,7 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
          * fix it up and turn FL_LOCAL flag off.
          */
         rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
-                          res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB,
+                          res_id, LDLM_FL_LOCAL_ONLY | dlmflags,
                           &info->mti_exp->exp_handle.h_cookie);
         if (rc)
                 mdt_object_unlock(info, o, lh, 1);
@@ -2453,6 +2479,25 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
         RETURN(rc);
 }
 
+int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
+                   struct mdt_lock_handle *lh, __u64 ibits, int locality)
+{
+       return mdt_object_lock0(info, o, lh, ibits, false, locality);
+}
+
+int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o,
+                       struct mdt_lock_handle *lh, __u64 ibits, int locality)
+{
+       struct mdt_lock_handle tmp = *lh;
+       int rc;
+
+       rc = mdt_object_lock0(info, o, &tmp, ibits, true, locality);
+       if (rc == 0)
+               *lh = tmp;
+
+       return rc == 0;
+}
+
 /**
  * Save a lock within request object.
  *
@@ -3215,6 +3260,10 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
                               struct mdt_thread_info *info,
                               struct ldlm_lock **,
                              __u64);
+static int mdt_intent_layout(enum mdt_it_code opcode,
+                            struct mdt_thread_info *info,
+                            struct ldlm_lock **,
+                            __u64);
 static int mdt_intent_reint(enum mdt_it_code opcode,
                             struct mdt_thread_info *info,
                             struct ldlm_lock **,
@@ -3279,11 +3328,11 @@ static struct mdt_it_flavor {
                 .it_flags = 0,
                 .it_act   = NULL
         },
-        [MDT_IT_LAYOUT] = {
-                .it_fmt   = &RQF_LDLM_INTENT_GETATTR,
-                .it_flags = HABEO_REFERO,
-                .it_act   = mdt_intent_getattr
-        }
+       [MDT_IT_LAYOUT] = {
+               .it_fmt   = &RQF_LDLM_INTENT_LAYOUT,
+               .it_flags = 0,
+               .it_act   = mdt_intent_layout
+       }
 };
 
 int mdt_intent_lock_replace(struct mdt_thread_info *info,
@@ -3460,16 +3509,6 @@ static int mdt_intent_getattr(enum mdt_it_code opcode,
         case MDT_IT_GETATTR:
                 child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
                 break;
-        case MDT_IT_LAYOUT: {
-                static int printed = 0;
-
-                if (!printed) {
-                        CERROR("layout lock not supported by this version\n");
-                        printed = 1;
-                }
-                GOTO(out_shrink, rc = -EINVAL);
-                break;
-        }
         default:
                 CERROR("Unsupported intent (%d)\n", opcode);
                 GOTO(out_shrink, rc = -EINVAL);
@@ -3509,6 +3548,39 @@ out_shrink:
         return rc;
 }
 
+static int mdt_intent_layout(enum mdt_it_code opcode,
+                            struct mdt_thread_info *info,
+                            struct ldlm_lock **lockp,
+                            __u64 flags)
+{
+       struct layout_intent *layout;
+       int rc;
+       ENTRY;
+
+       if (opcode != MDT_IT_LAYOUT) {
+               CERROR("%s: Unknown intent (%d)\n",
+                       info->mti_exp->exp_obd->obd_name, opcode);
+               RETURN(-EINVAL);
+       }
+
+       (*lockp)->l_lvb_type = LVB_T_LAYOUT;
+       req_capsule_set_size(info->mti_pill, &RMF_DLM_LVB, RCL_SERVER,
+                       ldlm_lvbo_size(*lockp));
+       rc = req_capsule_server_pack(info->mti_pill);
+       if (rc != 0)
+               RETURN(-EINVAL);
+
+       layout = req_capsule_client_get(info->mti_pill, &RMF_LAYOUT_INTENT);
+       LASSERT(layout != NULL);
+       if (layout->li_opc == LAYOUT_INTENT_ACCESS)
+               /* return to normal ldlm handling */
+               RETURN(0);
+
+       CERROR("%s: Unsupported layout intent (%d)\n",
+               info->mti_exp->exp_obd->obd_name, layout->li_opc);
+       RETURN(-EINVAL);
+}
+
 static int mdt_intent_reint(enum mdt_it_code opcode,
                             struct mdt_thread_info *info,
                             struct ldlm_lock **lockp,
@@ -3585,7 +3657,7 @@ static int mdt_intent_reint(enum mdt_it_code opcode,
                   */
                 if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
                         LASSERTF(rc == 0, "Error occurred but lock handle "
-                                 "is still in use\n");
+                                 "is still in use, rc = %d\n", rc);
                         rep->lock_policy_res2 = 0;
                         rc = mdt_intent_lock_replace(info, lockp, NULL, lhc, flags);
                         RETURN(rc);
@@ -3672,14 +3744,6 @@ static int mdt_intent_opc(long itopc, struct mdt_thread_info *info,
                RETURN(rc);
        }
 
-       if (opc == MDT_IT_LAYOUT) {
-               (*lockp)->l_lvb_type = LVB_T_LAYOUT;
-               /* XXX: set replay RMF_DLM_LVB as the real EA size when LAYOUT
-                *      lock enabled. */
-       } else if (opc == MDT_IT_READDIR) {
-               req_capsule_set_size(pill, &RMF_DLM_LVB, RCL_SERVER, 0);
-       }
-
        flv  = &mdt_it_flavor[opc];
         if (flv->it_fmt != NULL)
                 req_capsule_extend(pill, flv->it_fmt);