Whamcloud - gitweb
LU-11549 mdd: set LUSTRE_ORPHAN_FL for non-dirs
[fs/lustre-release.git] / lustre / mdt / mdt_open.c
index 40025f6..bcb7c2f 100644 (file)
@@ -49,7 +49,7 @@ static void mdt_mfd_get(void *mfdp)
 {
 }
 
-static struct portals_handle_ops mfd_open_handle_ops = {
+static const struct portals_handle_ops mfd_open_handle_ops = {
        .hop_addref = mdt_mfd_get,
        .hop_free   = NULL,
 };
@@ -64,7 +64,7 @@ struct mdt_file_data *mdt_mfd_new(const struct mdt_export_data *med)
        OBD_ALLOC_PTR(mfd);
        if (mfd != NULL) {
                INIT_LIST_HEAD_RCU(&mfd->mfd_open_handle.h_link);
-               mfd->mfd_open_handle.h_owner = med;
+               mfd->mfd_owner = med;
                INIT_LIST_HEAD(&mfd->mfd_list);
                class_handle_hash(&mfd->mfd_open_handle, &mfd_open_handle_ops);
        }
@@ -86,9 +86,9 @@ struct mdt_file_data *mdt_open_handle2mfd(struct mdt_export_data *med,
        ENTRY;
 
        LASSERT(open_handle != NULL);
-       mfd = class_handle2object(open_handle->cookie, med);
+       mfd = class_handle2object(open_handle->cookie, &mfd_open_handle_ops);
        /* during dw/setattr replay the mfd can be found by old handle */
-       if (mfd == NULL && is_replay_or_resent) {
+       if ((!mfd || mfd->mfd_owner != med) && is_replay_or_resent) {
                list_for_each_entry(mfd, &med->med_open_head, mfd_list) {
                        if (mfd->mfd_open_handle_old.cookie ==
                            open_handle->cookie)
@@ -394,7 +394,8 @@ static int mdt_mfd_open(struct mdt_thread_info *info, struct mdt_object *p,
                RETURN(rc);
 
        rc = mo_open(info->mti_env, mdt_object_child(o),
-                    created ? open_flags | MDS_OPEN_CREATED : open_flags);
+                    created ? open_flags | MDS_OPEN_CREATED : open_flags,
+                    &info->mti_spec);
        if (rc != 0) {
                /* If we allow the client to chgrp (CFS_SETGRP_PERM), but the
                 * client does not know which suppgid should be sent to the MDS,
@@ -542,6 +543,11 @@ static int mdt_finish_open(struct mdt_thread_info *info,
                RETURN(-EOPNOTSUPP);
        }
 
+       /* Overstriped files can crash older clients */
+       if (isreg && !exp_connect_overstriping(exp) &&
+           mdt_lmm_is_overstriping(ma->ma_lmm))
+               RETURN(-EOPNOTSUPP);
+
        /* LU-2275, simulate broken behaviour (esp. prevalent in
         * pre-2.4 servers where a very strange reply is sent on error
         * that looks like it was actually almost successful and a
@@ -1214,6 +1220,8 @@ static int mdt_cross_open(struct mdt_thread_info *info,
                        if (rc != 0)
                                GOTO(out, rc);
 
+                       mdt_pack_secctx_in_reply(info, o);
+
                        rc = mdt_finish_open(info, NULL, o, open_flags, 0, rep);
                } else {
                        /*
@@ -1358,11 +1366,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                        GOTO(out, result = -EFAULT);
                }
                CDEBUG(D_INFO, "No object(1), continue as regular open.\n");
-       } else if (open_flags & (MDS_OPEN_BY_FID | MDS_OPEN_LOCK)) {
-               /*
-                * MDS_OPEN_LOCK is checked for backward compatibility with 2.1
-                * client.
-                */
+       } else if (open_flags & MDS_OPEN_BY_FID) {
                result = mdt_open_by_fid_lock(info, ldlm_rep, lhc);
                if (result < 0)
                        CDEBUG(D_INFO, "no object for "DFID": %d\n",
@@ -1405,8 +1409,10 @@ again:
 
        fid_zero(child_fid);
 
-       result = mdo_lookup(info->mti_env, mdt_object_child(parent),
-                           &rr->rr_name, child_fid, &info->mti_spec);
+       result = -ENOENT;
+       if ((open_flags & MDS_OPEN_VOLATILE) == 0)
+               result = mdo_lookup(info->mti_env, mdt_object_child(parent),
+                                   &rr->rr_name, child_fid, &info->mti_spec);
 
        LASSERTF(ergo(result == 0, fid_is_sane(child_fid)),
                 "looking for "DFID"/"DNAME", found FID = "DFID"\n",
@@ -1484,7 +1490,8 @@ again:
 
        if (result == -ENOENT) {
                /* Create under OBF and .lustre is not permitted */
-               if (!fid_is_md_operative(rr->rr_fid1))
+               if (!fid_is_md_operative(rr->rr_fid1) &&
+                   (open_flags & MDS_OPEN_VOLATILE) == 0)
                        GOTO(out_child, result = -EPERM);
 
                /* save versions in reply */
@@ -1566,7 +1573,9 @@ again:
                               PNAME(&rr->rr_name), PFID(child_fid));
                        GOTO(out_child, result = -EIO);
                }
-        }
+       }
+
+       mdt_pack_secctx_in_reply(info, child);
 
        rc = mdt_check_resent_lock(info, child, lhc);
        if (rc < 0) {
@@ -1694,7 +1703,7 @@ static struct mdt_object *mdt_orphan_open(struct mdt_thread_info *info,
                GOTO(out, rc);
        }
 
-       rc = mo_open(env, mdt_object_child(obj), MDS_OPEN_CREATED);
+       rc = mo_open(env, mdt_object_child(obj), MDS_OPEN_CREATED, spec);
        if (rc < 0)
                CERROR("%s: cannot open volatile file "DFID", orphan "
                       "file will be left in PENDING directory until "
@@ -1737,6 +1746,22 @@ static inline int mdt_hsm_set_released(struct lov_mds_md *lmm)
        return 0;
 }
 
+static inline int mdt_get_lmm_gen(struct lov_mds_md *lmm, __u32 *gen)
+{
+       struct lov_comp_md_v1 *comp_v1;
+
+       if (le32_to_cpu(lmm->lmm_magic == LOV_MAGIC_COMP_V1)) {
+               comp_v1 = (struct lov_comp_md_v1 *)lmm;
+               *gen = le32_to_cpu(comp_v1->lcm_layout_gen);
+       } else if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1 ||
+                  le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3) {
+               *gen = le16_to_cpu(lmm->lmm_layout_gen);
+       } else {
+               return -EINVAL;
+       }
+       return 0;
+}
+
 static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
                           struct md_attr *ma)
 {
@@ -1796,19 +1821,66 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
        if (rc != 0)
                GOTO(out_unlock, rc);
 
-       if (!mdt_hsm_release_allow(ma))
-               GOTO(out_unlock, rc = -EPERM);
-
-       /* already released? */
-       if (ma->ma_hsm.mh_flags & HS_RELEASED)
-               GOTO(out_unlock, rc = 0);
+       if (ma->ma_attr_flags & MDS_PCC_ATTACH) {
+               if (ma->ma_valid & MA_HSM) {
+                       if (ma->ma_hsm.mh_flags & HS_RELEASED)
+                               GOTO(out_unlock, rc = -EALREADY);
+
+                       if (ma->ma_hsm.mh_arch_id != data->cd_archive_id)
+                               CDEBUG(D_CACHE,
+                                      DFID" archive id diff: %llu:%u\n",
+                                      PFID(mdt_object_fid(o)),
+                                      ma->ma_hsm.mh_arch_id,
+                                      data->cd_archive_id);
+
+                       if (!(ma->ma_hsm.mh_flags & HS_DIRTY) &&
+                           ma->ma_hsm.mh_arch_ver == data->cd_data_version) {
+                               CDEBUG(D_CACHE,
+                                      DFID" data version matches: packed=%llu "
+                                      "and on-disk=%llu\n",
+                                      PFID(mdt_object_fid(o)),
+                                      data->cd_data_version,
+                                      ma->ma_hsm.mh_arch_ver);
+                               ma->ma_hsm.mh_flags = HS_ARCHIVED | HS_EXISTS;
+                       }
 
-       /* Compare on-disk and packed data_version */
-       if (data->cd_data_version != ma->ma_hsm.mh_arch_ver) {
-               CDEBUG(D_HSM, DFID" data_version mismatches: packed=%llu"
-                      " and on-disk=%llu\n", PFID(mdt_object_fid(o)),
-                      data->cd_data_version, ma->ma_hsm.mh_arch_ver);
-               GOTO(out_unlock, rc = -EPERM);
+                       if (ma->ma_hsm.mh_flags & HS_DIRTY)
+                               ma->ma_hsm.mh_flags = HS_ARCHIVED | HS_EXISTS;
+               } else {
+                       /* Set up HSM attribte for PCC archived object */
+                       BUILD_BUG_ON(sizeof(struct hsm_attrs) >
+                                    sizeof(info->mti_xattr_buf));
+                       buf = &info->mti_buf;
+                       buf->lb_buf = info->mti_xattr_buf;
+                       buf->lb_len = sizeof(struct hsm_attrs);
+                       memset(&ma->ma_hsm, 0, sizeof(ma->ma_hsm));
+                       ma->ma_hsm.mh_flags = HS_ARCHIVED | HS_EXISTS;
+                       ma->ma_hsm.mh_arch_id = data->cd_archive_id;
+                       ma->ma_hsm.mh_arch_ver = data->cd_data_version;
+                       lustre_hsm2buf(buf->lb_buf, &ma->ma_hsm);
+
+                       rc = mo_xattr_set(info->mti_env, mdt_object_child(o),
+                                         buf, XATTR_NAME_HSM, 0);
+                       if (rc)
+                               GOTO(out_unlock, rc);
+               }
+       } else {
+               if (!mdt_hsm_release_allow(ma))
+                       GOTO(out_unlock, rc = -EPERM);
+
+               /* already released? */
+               if (ma->ma_hsm.mh_flags & HS_RELEASED)
+                       GOTO(out_unlock, rc = 0);
+
+               /* Compare on-disk and packed data_version */
+               if (data->cd_data_version != ma->ma_hsm.mh_arch_ver) {
+                       CDEBUG(D_HSM, DFID" data_version mismatches: "
+                              "packed=%llu and on-disk=%llu\n",
+                              PFID(mdt_object_fid(o)),
+                              data->cd_data_version,
+                              ma->ma_hsm.mh_arch_ver);
+                       GOTO(out_unlock, rc = -EPERM);
+               }
        }
 
        ma->ma_valid = MA_INODE;
@@ -1876,7 +1948,7 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
        }
 
        /* Set up HSM attribute for orphan object */
-       CLASSERT(sizeof(struct hsm_attrs) <= sizeof(info->mti_xattr_buf));
+       BUILD_BUG_ON(sizeof(struct hsm_attrs) > sizeof(info->mti_xattr_buf));
        buf = &info->mti_buf;
        buf->lb_buf = info->mti_xattr_buf;
        buf->lb_len = sizeof(struct hsm_attrs);
@@ -1904,6 +1976,12 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
        rc = mo_swap_layouts(info->mti_env, mdt_object_child(o),
                             mdt_object_child(orphan),
                             SWAP_LAYOUTS_MDS_HSM);
+
+       if (!rc && ma->ma_attr_flags & MDS_PCC_ATTACH) {
+               ma->ma_need = MA_LOV;
+               rc = mdt_attr_get_complex(info, o, ma);
+       }
+
        EXIT;
 
 out_layout_lock:
@@ -1930,10 +2008,17 @@ out_unlock:
                repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
                LASSERT(repbody != NULL);
                repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
+               if (ma->ma_attr_flags & MDS_PCC_ATTACH) {
+                       LASSERT(ma->ma_valid & MA_LOV);
+                       rc = mdt_get_lmm_gen(ma->ma_lmm,
+                                            &repbody->mbo_layout_gen);
+                       if (!rc)
+                               repbody->mbo_valid |= OBD_MD_LAYOUT_VERSION;
+               }
        }
 
 out_reprocess:
-       ldlm_reprocess_all(lease->l_resource);
+       ldlm_reprocess_all(lease->l_resource, lease);
        LDLM_LOCK_PUT(lease);
 
        ma->ma_valid = 0;
@@ -2096,7 +2181,7 @@ out_unlock_sem:
 out_obj:
        mdt_object_put(info->mti_env, swap_objects ? o1 : o2);
 
-       ldlm_reprocess_all(lease->l_resource);
+       ldlm_reprocess_all(lease->l_resource, lease);
 
 out_lease:
        LDLM_LOCK_PUT(lease);
@@ -2213,7 +2298,7 @@ out_unlock:
                OBD_FREE(resync_ids, resync_count * sizeof(__u32));
 
 out_reprocess:
-       ldlm_reprocess_all(lease->l_resource);
+       ldlm_reprocess_all(lease->l_resource, lease);
        LDLM_LOCK_PUT(lease);
 
        ma->ma_valid = 0;
@@ -2238,7 +2323,6 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
        int rc = 0;
        u64 open_flags;
        u64 intent;
-       bool discard = false;
 
        ENTRY;
 
@@ -2322,7 +2406,8 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
 
        if (!MFD_CLOSED(open_flags)) {
                rc = mo_close(info->mti_env, next, ma, open_flags);
-               discard = mdt_dom_check_for_discard(info, o);
+               if (mdt_dom_check_for_discard(info, o))
+                       mdt_dom_discard_data(info, o);
        }
 
        /* adjust open and lease count */
@@ -2334,9 +2419,6 @@ int mdt_mfd_close(struct mdt_thread_info *info, struct mdt_file_data *mfd)
        mdt_mfd_free(mfd);
        mdt_object_put(info->mti_env, o);
 
-       if (discard)
-               mdt_dom_discard_data(info, ofid);
-
        RETURN(rc);
 }