Whamcloud - gitweb
LU-8367 osp: enable replay for precreation request
[fs/lustre-release.git] / lustre / ofd / ofd_objects.c
index 8579158..dd934aa 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * lustre/ofd/ofd_objects.c
  *
@@ -64,8 +63,6 @@ static int ofd_version_get_check(struct ofd_thread_info *info,
 {
        dt_obj_version_t curr_version;
 
-       LASSERT(ofd_object_exists(fo));
-
        if (info->fti_exp == NULL)
                RETURN(0);
 
@@ -280,7 +277,7 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
                RETURN(rc = -ENOSPC);
        }
 
-       OBD_ALLOC(batch, nr_saved * sizeof(*batch));
+       OBD_ALLOC_PTR_ARRAY(batch, nr_saved);
        if (batch == NULL)
                RETURN(-ENOMEM);
 
@@ -359,7 +356,7 @@ int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
                }
        }
 
-       rc = dt_trans_start_local(env, ofd->ofd_osd, th);
+       rc = dt_trans_start(env, ofd->ofd_osd, th);
        if (rc)
                GOTO(trans_stop, rc);
 
@@ -462,7 +459,7 @@ out:
                        continue;
                ofd_object_put(env, fo);
        }
-       OBD_FREE(batch, nr_saved * sizeof(*batch));
+       OBD_FREE_PTR_ARRAY(batch, nr_saved);
 
        CDEBUG((objects == 0 && rc == 0) ? D_ERROR : D_OTHER,
               "created %d/%d objects: %d\n", objects, nr_saved, rc);
@@ -621,8 +618,9 @@ int ofd_object_ff_update(const struct lu_env *env, struct ofd_object *fo,
                        ff->ff_layout_version = oa->o_layout_version;
                        ff->ff_range = 0;
                } else if (oa->o_layout_version > ff->ff_layout_version) {
-                       ff->ff_range = MAX(ff->ff_range,
-                                 oa->o_layout_version - ff->ff_layout_version);
+                       ff->ff_range = max_t(__u32, ff->ff_range,
+                                            oa->o_layout_version -
+                                            ff->ff_layout_version);
                }
        }
 
@@ -741,6 +739,121 @@ out:
 }
 
 /**
+ * Fallocate(Preallocate) space for OFD object.
+ *
+ * This function allocates space for the object from the \a start
+ * offset to the \a end offset.
+ *
+ * \param[in] env      execution environment
+ * \param[in] fo       OFD object
+ * \param[in] start    start offset to allocate from
+ * \param[in] end      end of allocate
+ * \param[in] mode     fallocate mode
+ * \param[in] la       object attributes
+ * \param[in] ff       filter_fid structure
+ *
+ * \retval             0 if successful
+ * \retval             negative value on error
+ */
+int ofd_object_fallocate(const struct lu_env *env, struct ofd_object *fo,
+                        __u64 start, __u64 end, int mode, struct lu_attr *la,
+                        struct obdo *oa)
+{
+       struct ofd_thread_info *info = ofd_info(env);
+       struct ofd_device *ofd = ofd_obj2dev(fo);
+       struct dt_object *dob = ofd_object_child(fo);
+       struct thandle *th;
+       struct filter_fid *ff = &info->fti_mds_fid;
+       bool ff_needed = false;
+       int rc;
+
+       ENTRY;
+
+       if (!ofd_object_exists(fo))
+               RETURN(-ENOENT);
+
+       /* VBR: version recovery check */
+       rc = ofd_version_get_check(info, fo);
+       if (rc != 0)
+               RETURN(rc);
+
+       if (ff != NULL) {
+               rc = ofd_object_ff_load(env, fo);
+               if (rc == -ENODATA)
+                       ff_needed = true;
+               else if (rc < 0)
+                       RETURN(rc);
+
+               if (ff_needed) {
+                       if (oa->o_valid & OBD_MD_FLFID) {
+                               ff->ff_parent.f_seq = oa->o_parent_seq;
+                               ff->ff_parent.f_oid = oa->o_parent_oid;
+                               ff->ff_parent.f_ver = oa->o_stripe_idx;
+                       }
+                       if (oa->o_valid & OBD_MD_FLOSTLAYOUT)
+                               ff->ff_layout = oa->o_layout;
+                       if (oa->o_valid & OBD_MD_LAYOUT_VERSION)
+                               ff->ff_layout_version = oa->o_layout_version;
+                       filter_fid_cpu_to_le(ff, ff, sizeof(*ff));
+               }
+       }
+
+       th = ofd_trans_create(env, ofd);
+       if (IS_ERR(th))
+               RETURN(PTR_ERR(th));
+
+       rc = dt_declare_attr_set(env, dob, la, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = dt_declare_fallocate(env, dob, start, end, mode, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       if (ff_needed) {
+               info->fti_buf.lb_buf = ff;
+               info->fti_buf.lb_len = sizeof(*ff);
+               rc = dt_declare_xattr_set(env, ofd_object_child(fo),
+                                         &info->fti_buf, XATTR_NAME_FID, 0,
+                                         th);
+               if (rc)
+                       GOTO(stop, rc);
+       }
+
+       rc = ofd_trans_start(env, ofd, fo, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       ofd_read_lock(env, fo);
+       if (!ofd_object_exists(fo))
+               GOTO(unlock, rc = -ENOENT);
+
+       if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME))
+               tgt_fmd_update(info->fti_exp, &fo->ofo_header.loh_fid,
+                              info->fti_xid);
+
+       rc = dt_falloc(env, dob, start, end, mode, th);
+       if (rc)
+               GOTO(unlock, rc);
+
+       rc = dt_attr_set(env, dob, la, th);
+       if (rc)
+               GOTO(unlock, rc);
+
+       if (ff_needed) {
+               rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
+                                 XATTR_NAME_FID, 0, th);
+               if (!rc)
+                       filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
+       }
+unlock:
+       ofd_read_unlock(env, fo);
+stop:
+       ofd_trans_stop(env, ofd, th, rc);
+       RETURN(rc);
+}
+
+/**
  * Truncate/punch OFD object.
  *
  * This function frees all of the allocated object's space from the \a start
@@ -833,6 +946,17 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
                oa->o_valid &= ~OBD_MD_LAYOUT_VERSION;
        }
 
+       if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & LUSTRE_ENCRYPT_FL) {
+               /* punch must be aware we are dealing with an encrypted file */
+               struct lu_attr la = {
+                       .la_valid = LA_FLAGS,
+                       .la_flags = LUSTRE_ENCRYPT_FL,
+               };
+
+               rc = dt_attr_set(env, dob, &la, th);
+               if (rc)
+                       GOTO(unlock, rc);
+       }
        rc = dt_punch(env, dob, start, OBD_OBJECT_EOF, th);
        if (rc)
                GOTO(unlock, rc);
@@ -924,14 +1048,14 @@ int ofd_destroy(const struct lu_env *env, struct ofd_object *fo,
 
        ofd_write_lock(env, fo);
        if (!ofd_object_exists(fo))
-               GOTO(stop, rc = -ENOENT);
+               GOTO(unlock, rc = -ENOENT);
 
        tgt_fmd_drop(ofd_info(env)->fti_exp, &fo->ofo_header.loh_fid);
 
        dt_ref_del(env, ofd_object_child(fo), th);
        dt_destroy(env, ofd_object_child(fo), th);
+unlock:
        ofd_write_unlock(env, fo);
-
 stop:
        rc2 = ofd_trans_stop(env, ofd, th, rc);
        if (rc2)