Whamcloud - gitweb
LU-13456 ldlm: fix reprocessing of locks with more bits
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
index 877c723..bc5ad6a 100644 (file)
@@ -867,8 +867,8 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
                        GOTO(out, rc = -EPROTO);
                }
 
-               /* XXX: only exclusive open is supported. */
-               lm = LCK_EX;
+               /* should conflict with new opens for write/execute */
+               lm = LCK_PW;
                *ibits = MDS_INODELOCK_OPEN;
 
                /* never grant LCK_EX layout lock to client */
@@ -986,8 +986,11 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
                        PFID(mdt_object_fid(obj)),
                        atomic_read(&obj->mot_open_count), open_count);
 
-               if (atomic_read(&obj->mot_open_count) > open_count)
-                       GOTO(out, rc = -EBUSY);
+               if (atomic_read(&obj->mot_open_count) > open_count) {
+                       /* fail if anyone *else* has opened file for write */
+                       if (mdt_write_read(obj) > 1)
+                               GOTO(out, rc = -EBUSY);
+               }
        }
        GOTO(out, rc);
 
@@ -1301,7 +1304,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
        struct ptlrpc_request *req = mdt_info_req(info);
        struct mdt_object *parent;
        struct mdt_object *child;
-       struct mdt_lock_handle *lh;
+       struct mdt_lock_handle *lh = NULL;
        struct ldlm_reply *ldlm_rep;
        struct mdt_body *repbody;
        struct lu_fid *child_fid = &info->mti_tmp_fid1;
@@ -1403,20 +1406,23 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
 
        OBD_RACE(OBD_FAIL_MDS_REINT_OPEN);
 again_pw:
-       lh = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_pdo_init(lh, lock_mode, &rr->rr_name);
-
-       result = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE);
-       if (result != 0) {
-               mdt_object_put(info->mti_env, parent);
-               GOTO(out, result);
-       }
        fid_zero(child_fid);
 
-       result = -ENOENT;
-       if ((open_flags & MDS_OPEN_VOLATILE) == 0)
+       if (open_flags & MDS_OPEN_VOLATILE) {
+               lh = NULL;
+               result = -ENOENT;
+       } else {
+               lh = &info->mti_lh[MDT_LH_PARENT];
+               mdt_lock_pdo_init(lh, lock_mode, &rr->rr_name);
+               result = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE);
+               if (result != 0) {
+                       mdt_object_put(info->mti_env, parent);
+                       GOTO(out, result);
+               }
+
                result = mdo_lookup(info->mti_env, mdt_object_child(parent),
                                    &rr->rr_name, child_fid, &info->mti_spec);
+       }
 
        LASSERTF(ergo(result == 0, fid_is_sane(child_fid)),
                 "looking for "DFID"/"DNAME", found FID = "DFID"\n",
@@ -1435,7 +1441,9 @@ again_pw:
                if (mdt_rdonly(req->rq_export))
                        GOTO(out_parent, result = -EROFS);
 
-               if (lock_mode == LCK_PR) {
+               LASSERT(equi(lh == NULL, open_flags & MDS_OPEN_VOLATILE));
+
+               if (lh != NULL && lock_mode == LCK_PR) {
                        /* first pass: get write lock and restart */
                        mdt_object_unlock(info, parent, lh, 1);
                        mdt_clear_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
@@ -1483,10 +1491,6 @@ again_pw:
                /* Not found and with MDS_OPEN_CREAT: let's create it. */
                mdt_set_disposition(info, ldlm_rep, DISP_OPEN_CREATE);
 
-               /* Let lower layers know what is lock mode on directory. */
-               info->mti_spec.sp_cr_mode =
-                       mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
-
                /* Don't do lookup sanity check. We know name doesn't exist. */
                info->mti_spec.sp_cr_lookup = 0;
                info->mti_spec.sp_feat = &dt_directory_features;
@@ -1639,7 +1643,10 @@ out_child:
        if (result == 0)
                mdt_pack_size2body(info, child_fid, &lhc->mlh_reg_lh);
 out_parent:
-       mdt_object_unlock_put(info, parent, lh, result || !created);
+       if (lh != NULL)
+               mdt_object_unlock(info, parent, lh, result || !created);
+
+       mdt_object_put(info->mti_env, parent);
 out:
        if (result)
                lustre_msg_set_transno(req->rq_repmsg, 0);
@@ -1664,7 +1671,7 @@ static struct mdt_object *mdt_orphan_open(struct mdt_thread_info *info,
                .ln_namelen = sizeof("i_am_nobody") - 1,
        };
        struct lu_ucred *uc;
-       cfs_cap_t uc_cap_save;
+       kernel_cap_t uc_cap_save;
        int rc;
        ENTRY;
 
@@ -1682,7 +1689,6 @@ static struct mdt_object *mdt_orphan_open(struct mdt_thread_info *info,
 
        spec->sp_cr_lookup = 0;
        spec->sp_feat = &dt_directory_features;
-       spec->sp_cr_mode = MDL_MINMODE; /* no lock */
        spec->sp_cr_flags = MDS_OPEN_VOLATILE | fmode;
        if (attr->ma_valid & MA_LOV) {
                spec->u.sp_ea.eadata = attr->ma_lmm;
@@ -1694,7 +1700,7 @@ static struct mdt_object *mdt_orphan_open(struct mdt_thread_info *info,
 
        uc = lu_ucred(env);
        uc_cap_save = uc->uc_cap;
-       uc->uc_cap |= BIT(CAP_DAC_OVERRIDE);
+       cap_raise(uc->uc_cap, CAP_DAC_OVERRIDE);
        rc = mdo_create(env, mdt_object_child(local_root), &lname,
                        mdt_object_child(obj), spec, attr);
        uc->uc_cap = uc_cap_save;
@@ -1773,7 +1779,7 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
        struct mdt_object      *orphan;
        struct md_attr         *orp_ma;
        struct lu_buf          *buf;
-       cfs_cap_t               cap;
+       kernel_cap_t cap;
        bool                    lease_broken;
        int                     rc;
        int                     rc2;
@@ -1966,7 +1972,7 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
        /* The orphan has root ownership so we need to raise
         * CAP_FOWNER to set the HSM attributes. */
        cap = uc->uc_cap;
-       uc->uc_cap |= MD_CAP_TO_MASK(CAP_FOWNER);
+       cap_raise(uc->uc_cap, CAP_FOWNER);
        rc = mo_xattr_set(info->mti_env, mdt_object_child(orphan), buf,
                          XATTR_NAME_HSM, 0);
        uc->uc_cap = cap;
@@ -2019,7 +2025,8 @@ out_unlock:
        }
 
 out_reprocess:
-       ldlm_reprocess_all(lease->l_resource, lease);
+       ldlm_reprocess_all(lease->l_resource,
+                          lease->l_policy_data.l_inodebits.bits);
        LDLM_LOCK_PUT(lease);
 
        ma->ma_valid = 0;
@@ -2162,8 +2169,11 @@ int mdt_close_handle_layouts(struct mdt_thread_info *info,
                        mrd.mrd_obj = NULL;
                }
 
-               if (ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT)
+               if (ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT) {
                        mrd.mrd_mirror_id = data->cd_mirror_id;
+                       /* set a small enough blocks in the SoM */
+                       ma->ma_attr.la_blocks >>= 1;
+               }
 
                buf->lb_len = sizeof(mrd);
                buf->lb_buf = &mrd;
@@ -2171,11 +2181,18 @@ int mdt_close_handle_layouts(struct mdt_thread_info *info,
                                  XATTR_LUSTRE_LOV,
                                  ma->ma_attr_flags & MDS_CLOSE_LAYOUT_SPLIT ?
                                  LU_XATTR_SPLIT : LU_XATTR_MERGE);
-               if (rc == 0 && ma->ma_attr.la_valid & (LA_SIZE | LA_BLOCKS)) {
+               if (rc == 0 && ma->ma_attr.la_valid & (LA_SIZE | LA_BLOCKS |
+                                                      LA_LSIZE | LA_LBLOCKS)) {
                        int rc2;
+                       enum lustre_som_flags lsf;
+
+                       if (ma->ma_attr.la_valid & (LA_SIZE | LA_BLOCKS))
+                               lsf = SOM_FL_STRICT;
+                       else
+                               lsf = SOM_FL_LAZY;
 
                        mutex_lock(&o->mot_som_mutex);
-                       rc2 = mdt_set_som(info, o, SOM_FL_STRICT,
+                       rc2 = mdt_set_som(info, o, lsf,
                                          ma->ma_attr.la_size,
                                          ma->ma_attr.la_blocks);
                        mutex_unlock(&o->mot_som_mutex);
@@ -2219,7 +2236,8 @@ out_obj:
                /* the 2nd object has been used, and not swapped */
                mdt_object_put(info->mti_env, o2);
 
-       ldlm_reprocess_all(lease->l_resource, lease);
+       ldlm_reprocess_all(lease->l_resource,
+                          lease->l_policy_data.l_inodebits.bits);
 
 out_lease:
        LDLM_LOCK_PUT(lease);
@@ -2256,7 +2274,7 @@ static int mdt_close_resync_done(struct mdt_thread_info *info,
        if (data == NULL)
                RETURN(-EPROTO);
 
-       if (ptlrpc_req_need_swab(mdt_info_req(info)))
+       if (req_capsule_req_need_swab(info->mti_pill))
                lustre_swab_close_data_resync_done(&data->cd_resync);
 
        if (!fid_is_zero(&data->cd_fid))
@@ -2340,7 +2358,8 @@ out_unlock:
                OBD_FREE_PTR_ARRAY(resync_ids, resync_count);
 
 out_reprocess:
-       ldlm_reprocess_all(lease->l_resource, lease);
+       ldlm_reprocess_all(lease->l_resource,
+                          lease->l_policy_data.l_inodebits.bits);
        LDLM_LOCK_PUT(lease);
 
        ma->ma_valid = 0;