Whamcloud - gitweb
LU-17705 ptlrpc: replace synchronize_rcu() with rcu_barrier()
[fs/lustre-release.git] / lustre / osp / osp_object.c
index a2f245b..38910f1 100644 (file)
@@ -340,13 +340,21 @@ osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, size_t len)
        return oxe;
 }
 
-/* whether \a oxe is large enough to hold XATTR value */
+/**
+ * Check whether \a oxe is large enough to hold the xattr value
+ *
+ * \param[in] oxe      pointer to the OSP object attributes cache xattr entry
+ * \param[in] len      xattr value size in bytes
+ *
+ * \retval             true if xattr can fit in \a oxe
+ * \retval             false if xattr can not fit in \a oxe
+ */
 static inline bool oxe_can_hold(struct osp_xattr_entry *oxe, size_t len)
 {
        if (unlikely(oxe->oxe_largebuf))
-               return oxe->oxe_buflen > len;
+               return oxe->oxe_buflen >= len;
 
-       return oxe->oxe_buflen - oxe->oxe_namelen - 1 - sizeof(*oxe) > len;
+       return oxe->oxe_buflen - oxe->oxe_namelen - 1 - sizeof(*oxe) >= len;
 }
 
 /**
@@ -1003,7 +1011,7 @@ int osp_xattr_get(const struct lu_env *env, struct dt_object *dt,
        LASSERT(buf != NULL);
        LASSERT(name != NULL);
 
-       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_NETWORK) &&
+       if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_NETWORK) &&
            osp->opd_index == cfs_fail_val) {
                if (is_ost_obj(&dt->do_lu)) {
                        if (osp_dev2node(osp) == cfs_fail_val)
@@ -1468,7 +1476,7 @@ static int osp_declare_create(const struct lu_env *env, struct dt_object *dt,
        /* should happen to non-0 OSP only so that at least one object
         * has been already declared in the scenario and LOD should
         * cleanup that */
-       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OSC_CREATE_FAIL) && d->opd_index == 1)
+       if (CFS_FAIL_CHECK(OBD_FAIL_MDS_OSC_CREATE_FAIL) && d->opd_index == 1)
                RETURN(-ENOSPC);
 
        LASSERT(d->opd_last_used_oid_file);
@@ -1550,12 +1558,14 @@ static int osp_create(const struct lu_env *env, struct dt_object *dt,
                      struct lu_attr *attr, struct dt_allocation_hint *hint,
                      struct dt_object_format *dof, struct thandle *th)
 {
-       struct osp_thread_info  *osi = osp_env_info(env);
-       struct osp_device       *d = lu2osp_dev(dt->do_lu.lo_dev);
-       struct osp_object       *o = dt2osp_obj(dt);
-       int                     rc = 0;
-       struct lu_fid           *fid = &osi->osi_fid;
-       struct thandle          *local_th;
+       struct osp_thread_info *osi = osp_env_info(env);
+       struct osp_device *d = lu2osp_dev(dt->do_lu.lo_dev);
+       struct osp_object *o = dt2osp_obj(dt);
+       struct lu_fid *fid = &osi->osi_fid;
+       struct thandle *local_th;
+       bool replay = false;
+       int rc = 0;
+
        ENTRY;
 
        if (is_only_remote_trans(th) &&
@@ -1573,19 +1583,21 @@ static int osp_create(const struct lu_env *env, struct dt_object *dt,
        if (o->opo_reserved) {
                /* regular case, fid is assigned holding transaction open */
                 osp_object_assign_fid(env, d, o);
+       } else {
+               replay = true;
        }
 
        memcpy(fid, lu_object_fid(&dt->do_lu), sizeof(*fid));
 
-       LASSERTF(fid_is_sane(fid), "fid for osp_object %p is insane"DFID"!\n",
+       LASSERTF(fid_is_sane(fid), "fid for osp_object %px is insane"DFID"!\n",
                 o, PFID(fid));
 
-       if (!o->opo_reserved) {
+       if (replay) {
                /* special case, id was assigned outside of transaction
                 * see comments in osp_declare_attr_set */
                LASSERT(d->opd_pre != NULL);
                spin_lock(&d->opd_pre_lock);
-               osp_update_last_fid(d, fid);
+               osp_update_last_fid(d, fid, true);
                spin_unlock(&d->opd_pre_lock);
        }
 
@@ -1595,7 +1607,7 @@ static int osp_create(const struct lu_env *env, struct dt_object *dt,
         * the new sequence soon, all the creation should be synchronized,
         * otherwise during replay, the replay fid will be inconsistent with
         * last_used/create fid */
-       if (osp_precreate_end_seq(env, d) && osp_is_fid_client(d))
+       if (osp_precreate_end_seq(d) && osp_is_fid_client(d))
                th->th_sync = 1;
 
        local_th = osp_get_storage_thandle(env, th, d);
@@ -1662,8 +1674,8 @@ static int osp_create(const struct lu_env *env, struct dt_object *dt,
  * \retval             0 for success
  * \retval             negative error number on failure
  */
-int osp_declare_destroy(const struct lu_env *env, struct dt_object *dt,
-                       struct thandle *th)
+static int osp_declare_destroy(const struct lu_env *env, struct dt_object *dt,
+                              struct thandle *th)
 {
        struct osp_object       *o = dt2osp_obj(dt);
        struct osp_device       *osp = lu2osp_dev(dt->do_lu.lo_dev);
@@ -1673,7 +1685,7 @@ int osp_declare_destroy(const struct lu_env *env, struct dt_object *dt,
 
        LASSERT(!osp->opd_connect_mdt);
 
-       if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
+       if (!CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ))
                rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th);
 
        RETURN(rc);
@@ -1709,7 +1721,7 @@ static int osp_destroy(const struct lu_env *env, struct dt_object *dt,
 
        LASSERT(!osp->opd_connect_mdt);
 
-       if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ)) {
+       if (!CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ)) {
                /* once transaction is committed put proper command on
                 * the queue going to our OST. */
                rc = osp_sync_add(env, o, MDS_UNLINK64_REC, th, NULL);
@@ -1970,21 +1982,22 @@ int osp_it_next_page(const struct lu_env *env, struct dt_it *di)
        int                     i;
        ENTRY;
 
-again2:
+process_idxpage:
        idxpage = it->ooi_cur_idxpage;
        if (idxpage != NULL) {
                if (idxpage->lip_nr == 0)
-                       RETURN(1);
+                       goto finish_cur_idxpage;
 
                if (it->ooi_pos_ent < idxpage->lip_nr) {
                        CDEBUG(D_INFO, "ooi_pos %d nr %d\n",
                               (int)it->ooi_pos_ent, (int)idxpage->lip_nr);
                        RETURN(0);
                }
+finish_cur_idxpage:
                it->ooi_cur_idxpage = NULL;
                it->ooi_pos_lu_page++;
 
-again1:
+process_page:
                if (it->ooi_pos_lu_page < LU_PAGE_COUNT) {
                        it->ooi_cur_idxpage = (void *)it->ooi_cur_page +
                                         LU_PAGE_SIZE * it->ooi_pos_lu_page;
@@ -2005,19 +2018,19 @@ again1:
                                RETURN(-EINVAL);
                        }
                        it->ooi_pos_ent = -1;
-                       goto again2;
+                       goto process_idxpage;
                }
 
                kunmap(it->ooi_cur_page);
                it->ooi_cur_page = NULL;
                it->ooi_pos_page++;
 
-again0:
+start:
                pages = it->ooi_pages;
                if (it->ooi_pos_page < it->ooi_valid_npages) {
                        it->ooi_cur_page = kmap(pages[it->ooi_pos_page]);
                        it->ooi_pos_lu_page = 0;
-                       goto again1;
+                       goto process_page;
                }
 
                for (i = 0; i < it->ooi_total_npages; i++) {
@@ -2041,7 +2054,7 @@ again0:
 
        rc = osp_it_fetch(env, it);
        if (rc == 0)
-               goto again0;
+               goto start;
 
        RETURN(rc);
 }
@@ -2183,11 +2196,11 @@ __u64 osp_it_store(const struct lu_env *env, const struct dt_it *di)
  * \retval             0 for arriving at the end of the iteration
  * \retval             negative error number on failure
  */
-int osp_orphan_it_load(const struct lu_env *env, const struct dt_it *di,
-                      __u64 hash)
+static int osp_orphan_it_load(const struct lu_env *env, const struct dt_it *di,
+                             __u64 hash)
 {
-       struct osp_it   *it     = (struct osp_it *)di;
-       int              rc;
+       struct osp_it *it = (struct osp_it *)di;
+       int rc;
 
        it->ooi_next = hash;
        rc = osp_orphan_it_next(env, (struct dt_it *)di);