Whamcloud - gitweb
LU-11186 ofd: fix for a final oid at sequence
[fs/lustre-release.git] / lustre / ofd / ofd_objects.c
index 9f76081..7605de4 100644 (file)
@@ -23,7 +23,7 @@
  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2014, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -152,8 +152,7 @@ int ofd_object_ff_load(const struct lu_env *env, struct ofd_object *fo)
 
        if (unlikely(rc < sizeof(struct lu_fid))) {
                fid_zero(&ff->ff_parent);
-
-               return -ENODATA;
+               return -EINVAL;
        }
 
        filter_fid_le_to_cpu(ff, ff, rc);
@@ -208,12 +207,12 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
 
        /* Don't create objects beyond the valid range for this SEQ */
        if (unlikely(fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) &&
-                    (id + nr) >= IDIF_MAX_OID)) {
+                    (id + nr) > IDIF_MAX_OID)) {
                CERROR("%s:"DOSTID" hit the IDIF_MAX_OID (1<<48)!\n",
                       ofd_name(ofd), id, ostid_seq(&oseq->os_oi));
                RETURN(rc = -ENOSPC);
        } else if (unlikely(!fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) &&
-                           (id + nr) >= OBIF_MAX_OID)) {
+                           (id + nr) > OBIF_MAX_OID)) {
                CERROR("%s:"DOSTID" hit the OBIF_MAX_OID (1<<32)!\n",
                       ofd_name(ofd), id, ostid_seq(&oseq->os_oi));
                RETURN(rc = -ENOSPC);
@@ -474,6 +473,100 @@ int ofd_attr_handle_id(const struct lu_env *env, struct ofd_object *fo,
 }
 
 /**
+ * Check if it needs to update filter_fid by the value of @oa.
+ *
+ * \param[in] env      env
+ * \param[in] fo       ofd object
+ * \param[in] oa       obdo from client or MDT
+ * \param[out] ff      if filter_fid needs updating, this field is used to
+ *                     return the new buffer
+ *
+ * \retval < 0         error occurred
+ * \retval 0           doesn't need to update filter_fid
+ * \retval FL_XATTR_{CREATE,REPLACE}   flag for xattr update
+ */
+int ofd_object_ff_update(const struct lu_env *env, struct ofd_object *fo,
+                        const struct obdo *oa, struct filter_fid *ff)
+{
+       int rc = 0;
+       ENTRY;
+
+       if (!(oa->o_valid &
+             (OBD_MD_FLFID | OBD_MD_FLOSTLAYOUT | OBD_MD_LAYOUT_VERSION)))
+               RETURN(0);
+
+       rc = ofd_object_ff_load(env, fo);
+       if (rc < 0 && rc != -ENODATA)
+               RETURN(rc);
+
+       LASSERT(ff != &fo->ofo_ff);
+       if (rc == -ENODATA) {
+               rc = LU_XATTR_CREATE;
+               memset(ff, 0, sizeof(*ff));
+       } else {
+               rc = LU_XATTR_REPLACE;
+               memcpy(ff, &fo->ofo_ff, sizeof(*ff));
+       }
+
+       if (oa->o_valid & OBD_MD_FLFID) {
+               /* packing fid and converting it to LE for storing into EA.
+                * Here ->o_stripe_idx should be filled by LOV and rest of
+                * fields - by client. */
+               ff->ff_parent.f_seq = oa->o_parent_seq;
+               ff->ff_parent.f_oid = oa->o_parent_oid;
+               /* XXX: we are ignoring o_parent_ver here, since this should
+                *      be the same for all objects in this fileset. */
+               ff->ff_parent.f_ver = oa->o_stripe_idx;
+       }
+       if (oa->o_valid & OBD_MD_FLOSTLAYOUT)
+               ff->ff_layout = oa->o_layout;
+
+       if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
+               CDEBUG(D_INODE, DFID": OST("DFID") layout version %u -> %u\n",
+                      PFID(&fo->ofo_ff.ff_parent),
+                      PFID(lu_object_fid(&fo->ofo_obj.do_lu)),
+                      ff->ff_layout_version, oa->o_layout_version);
+
+               /* only the MDS has the authority to update layout version */
+               if (!(exp_connect_flags(ofd_info(env)->fti_exp) &
+                     OBD_CONNECT_MDS)) {
+                       CERROR(DFID": update layout version from client\n",
+                              PFID(&fo->ofo_ff.ff_parent));
+
+                       RETURN(-EPERM);
+               }
+
+               if (ff->ff_layout_version & LU_LAYOUT_RESYNC) {
+                       /* this opens a new era of writing */
+                       ff->ff_layout_version = 0;
+                       ff->ff_range = 0;
+               }
+
+               /* it's not allowed to change it to a smaller value */
+               if (oa->o_layout_version < ff->ff_layout_version)
+                       RETURN(-EINVAL);
+
+               if (ff->ff_layout_version == 0 ||
+                   oa->o_layout_version & LU_LAYOUT_RESYNC) {
+                       /* if LU_LAYOUT_RESYNC is set, it closes the era of
+                        * writing. Only mirror I/O can write this object. */
+                       ff->ff_layout_version = oa->o_layout_version;
+                       ff->ff_range = 0;
+               } else if (oa->o_layout_version > ff->ff_layout_version) {
+                       ff->ff_range = MAX(ff->ff_range,
+                                 oa->o_layout_version - ff->ff_layout_version);
+               }
+       }
+
+       if (memcmp(ff, &fo->ofo_ff, sizeof(*ff)))
+               filter_fid_cpu_to_le(ff, ff, sizeof(*ff));
+       else /* no change */
+               rc = 0;
+
+       RETURN(rc);
+}
+
+/**
  * Set OFD object attributes.
  *
  * This function sets OFD object attributes taken from incoming request.
@@ -484,19 +577,20 @@ int ofd_attr_handle_id(const struct lu_env *env, struct ofd_object *fo,
  * \param[in] env      execution environment
  * \param[in] fo       OFD object
  * \param[in] la       object attributes
- * \param[in] ff       filter_fid structure, contains additional attributes
+ * \param[in] oa       obdo carries fid, ost_layout, layout version
  *
  * \retval             0 if successful
  * \retval             negative value on error
  */
 int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
-                struct lu_attr *la, struct filter_fid *ff)
+                struct lu_attr *la, struct obdo *oa)
 {
        struct ofd_thread_info  *info = ofd_info(env);
        struct ofd_device       *ofd = ofd_obj2dev(fo);
+       struct filter_fid       *ff = &info->fti_mds_fid;
        struct thandle          *th;
        struct ofd_mod_data     *fmd;
-       int                     ff_needed = 0;
+       int                     fl;
        int                     rc;
        int                     rc2;
        ENTRY;
@@ -521,13 +615,9 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
        if (rc != 0)
                GOTO(unlock, rc);
 
-       if (ff != NULL) {
-               rc = ofd_object_ff_load(env, fo);
-               if (rc == -ENODATA)
-                       ff_needed = 1;
-               else if (rc < 0)
-                       GOTO(unlock, rc);
-       }
+       fl = ofd_object_ff_update(env, fo, oa, ff);
+       if (fl < 0)
+               GOTO(unlock, rc = fl);
 
        th = ofd_trans_create(env, ofd);
        if (IS_ERR(th))
@@ -537,7 +627,7 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
        if (rc)
                GOTO(stop, rc);
 
-       if (ff_needed) {
+       if (fl) {
                if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
                        ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
                else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
@@ -546,7 +636,7 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
                info->fti_buf.lb_buf = ff;
                info->fti_buf.lb_len = sizeof(*ff);
                rc = dt_declare_xattr_set(env, ofd_object_child(fo),
-                                         &info->fti_buf, XATTR_NAME_FID, 0,
+                                         &info->fti_buf, XATTR_NAME_FID, fl,
                                          th);
                if (rc)
                        GOTO(stop, rc);
@@ -560,12 +650,14 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
        if (rc)
                GOTO(stop, rc);
 
-       if (ff_needed) {
+       if (fl) {
                if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
                        GOTO(stop, rc);
 
+               info->fti_buf.lb_buf = ff;
+               info->fti_buf.lb_len = sizeof(*ff);
                rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
-                                 XATTR_NAME_FID, 0, th);
+                                 XATTR_NAME_FID, fl, th);
                if (!rc)
                        filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
        }
@@ -599,7 +691,6 @@ unlock:
  * \param[in] start    start offset to punch from
  * \param[in] end      end of punch
  * \param[in] la       object attributes
- * \param[in] ff       filter_fid structure
  * \param[in] oa       obdo struct from incoming request
  *
  * \retval             0 if successful
@@ -607,14 +698,15 @@ unlock:
  */
 int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
                     __u64 start, __u64 end, struct lu_attr *la,
-                    struct filter_fid *ff, struct obdo *oa)
+                    struct obdo *oa)
 {
        struct ofd_thread_info  *info = ofd_info(env);
        struct ofd_device       *ofd = ofd_obj2dev(fo);
        struct ofd_mod_data     *fmd;
        struct dt_object        *dob = ofd_object_child(fo);
+       struct filter_fid       *ff = &info->fti_mds_fid;
        struct thandle          *th;
-       int                     ff_needed = 0;
+       int                     fl;
        int                     rc;
        int                     rc2;
 
@@ -638,6 +730,15 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
                        GOTO(unlock, rc);
        }
 
+       /* need to verify layout version */
+       if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
+               rc = ofd_verify_layout_version(env, fo, oa);
+               if (rc)
+                       GOTO(unlock, rc);
+
+               oa->o_valid &= ~OBD_MD_LAYOUT_VERSION;
+       }
+
        /* VBR: version recovery check */
        rc = ofd_version_get_check(info, fo);
        if (rc)
@@ -647,13 +748,9 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
        if (rc != 0)
                GOTO(unlock, rc);
 
-       if (ff != NULL) {
-               rc = ofd_object_ff_load(env, fo);
-               if (rc == -ENODATA)
-                       ff_needed = 1;
-               else if (rc < 0)
-                       GOTO(unlock, rc);
-       }
+       fl = ofd_object_ff_update(env, fo, oa, ff);
+       if (fl < 0)
+               GOTO(unlock, rc = fl);
 
        th = ofd_trans_create(env, ofd);
        if (IS_ERR(th))
@@ -667,7 +764,7 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
        if (rc)
                GOTO(stop, rc);
 
-       if (ff_needed) {
+       if (fl) {
                if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
                        ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
                else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
@@ -676,7 +773,7 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
                info->fti_buf.lb_buf = ff;
                info->fti_buf.lb_len = sizeof(*ff);
                rc = dt_declare_xattr_set(env, ofd_object_child(fo),
-                                         &info->fti_buf, XATTR_NAME_FID, 0,
+                                         &info->fti_buf, XATTR_NAME_FID, fl,
                                          th);
                if (rc)
                        GOTO(stop, rc);
@@ -694,12 +791,12 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
        if (rc)
                GOTO(stop, rc);
 
-       if (ff_needed) {
+       if (fl) {
                if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
                        GOTO(stop, rc);
 
                rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
-                                 XATTR_NAME_FID, 0, th);
+                                 XATTR_NAME_FID, fl, th);
                if (!rc)
                        filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
        }