Whamcloud - gitweb
LU-10048 ofd: take local locks within transaction
[fs/lustre-release.git] / lustre / ofd / ofd_io.c
index 936faf8..092e592 100644 (file)
@@ -660,6 +660,12 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp,
                ofd_seq_put(env, oseq);
        }
 
+       /* Process incoming grant info, set OBD_BRW_GRANTED flag and grant some
+        * space back if possible, we have to do this outside of the lock as
+        * grant preparation may need to sync whole fs thus wait for all the
+        * transactions to complete. */
+       tgt_grant_prepare_write(env, exp, oa, rnb, obj->ioo_bufcnt);
+
        fo = ofd_object_find(env, ofd, fid);
        if (IS_ERR(fo))
                GOTO(out, rc = PTR_ERR(fo));
@@ -695,10 +701,6 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp,
                oa->o_valid &= ~OBD_MD_LAYOUT_VERSION;
        }
 
-       /* Process incoming grant info, set OBD_BRW_GRANTED flag and grant some
-        * space back if possible */
-       tgt_grant_prepare_write(env, exp, oa, rnb, obj->ioo_bufcnt);
-
        if (ptlrpc_connection_is_local(exp->exp_connection))
                dbt |= DT_BUFS_TYPE_LOCAL;
 
@@ -727,6 +729,7 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp,
        if (unlikely(rc != 0))
                GOTO(err, rc);
 
+       ofd_read_unlock(env, fo);
        ofd_counter_incr(exp, LPROC_OFD_STATS_WRITE, jobid, tot_bytes);
        RETURN(0);
 err:
@@ -915,11 +918,8 @@ ofd_write_attr_set(const struct lu_env *env, struct ofd_device *ofd,
        if (rc != 0)
                GOTO(out, rc);
 
-       fl = ofd_object_ff_update(env, ofd_obj, oa, ff);
-       if (fl < 0)
-               GOTO(out, rc = fl);
-
-       if (!la->la_valid && !fl)
+       if (!la->la_valid && !(oa->o_valid &
+           (OBD_MD_FLFID | OBD_MD_FLOSTLAYOUT | OBD_MD_LAYOUT_VERSION)))
                /* no attributes to set */
                GOTO(out, rc = 0);
 
@@ -933,17 +933,10 @@ ofd_write_attr_set(const struct lu_env *env, struct ofd_device *ofd,
                        GOTO(out_tx, rc);
        }
 
-       if (fl) {
-               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
-                       ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
-               else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
-                       le32_add_cpu(&ff->ff_parent.f_oid, -1);
-
-               rc = dt_declare_xattr_set(env, dt_obj, &info->fti_buf,
-                                         XATTR_NAME_FID, 0, th);
-               if (rc)
-                       GOTO(out_tx, rc);
-       }
+       rc = dt_declare_xattr_set(env, dt_obj, &info->fti_buf,
+                       XATTR_NAME_FID, 0, th);
+       if (rc)
+               GOTO(out_tx, rc);
 
        /* We don't need a transno for this operation which will be re-executed
         * anyway when the OST_WRITE (with a transno assigned) is replayed */
@@ -951,33 +944,42 @@ ofd_write_attr_set(const struct lu_env *env, struct ofd_device *ofd,
        if (rc)
                GOTO(out_tx, rc);
 
+       ofd_read_lock(env, ofd_obj);
+
        /* set uid/gid/projid */
        if (la->la_valid) {
                rc = dt_attr_set(env, dt_obj, la, th);
                if (rc)
-                       GOTO(out_tx, rc);
+                       GOTO(out_unlock, rc);
        }
 
+       fl = ofd_object_ff_update(env, ofd_obj, oa, ff);
+       if (fl <= 0)
+               GOTO(out_unlock, rc = fl);
+
        /* set filter fid EA.
         * FIXME: it holds read lock of ofd object to modify the XATTR_NAME_FID
         * while the write lock should be held. However, it should work because
         * write RPCs only modify ff_{parent,layout} and those information will
         * be the same from all the write RPCs. The reason that fl is not used
         * in dt_xattr_set() is to allow this race. */
-       if (fl) {
-               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
-                       GOTO(out_tx, rc);
-
-               info->fti_buf.lb_buf = ff;
-               info->fti_buf.lb_len = sizeof(*ff);
-               rc = dt_xattr_set(env, dt_obj, &info->fti_buf, XATTR_NAME_FID,
-                                 0, th);
-               if (rc == 0)
-                       filter_fid_le_to_cpu(&ofd_obj->ofo_ff, ff, sizeof(*ff));
-       }
-
-       GOTO(out_tx, rc);
-
+       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
+               GOTO(out_unlock, rc);
+       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
+               ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
+       else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
+               le32_add_cpu(&ff->ff_parent.f_oid, -1);
+
+       info->fti_buf.lb_buf = ff;
+       info->fti_buf.lb_len = sizeof(*ff);
+       rc = dt_xattr_set(env, dt_obj, &info->fti_buf, XATTR_NAME_FID, 0, th);
+       if (rc == 0)
+               filter_fid_le_to_cpu(&ofd_obj->ofo_ff, ff, sizeof(*ff));
+
+       GOTO(out_unlock, rc);
+
+out_unlock:
+       ofd_read_unlock(env, ofd_obj);
 out_tx:
        dt_trans_stop(env, ofd->ofd_osd, th);
 out:
@@ -1103,13 +1105,14 @@ ofd_commitrw_write(const struct lu_env *env, struct obd_export *exp,
 
        fo = ofd_object_find(env, ofd, fid);
        LASSERT(fo != NULL);
-       LASSERT(ofd_object_exists(fo));
 
        o = ofd_object_child(fo);
        LASSERT(o != NULL);
 
        if (old_rc)
                GOTO(out, rc = old_rc);
+       if (!ofd_object_exists(fo))
+               GOTO(out, rc = -ENOENT);
 
        /*
         * The first write to each object must set some attributes.  It is
@@ -1182,21 +1185,27 @@ retry:
        if (rc)
                GOTO(out_stop, rc);
 
+       ofd_read_lock(env, fo);
+       if (!ofd_object_exists(fo))
+               GOTO(out_unlock, rc = -ENOENT);
+
        if (likely(!fake_write)) {
                rc = dt_write_commit(env, o, lnb, niocount, th);
                if (rc)
-                       GOTO(out_stop, rc);
+                       GOTO(out_unlock, rc);
        }
 
        if (la->la_valid) {
                rc = dt_attr_set(env, o, la, th);
                if (rc)
-                       GOTO(out_stop, rc);
+                       GOTO(out_unlock, rc);
        }
 
        /* get attr to return */
        rc = dt_attr_get(env, o, la);
 
+out_unlock:
+       ofd_read_unlock(env, fo);
 out_stop:
        /* Force commit to make the just-deleted blocks
         * reusable. LU-456 */
@@ -1232,7 +1241,6 @@ out_stop:
 
 out:
        dt_bufs_put(env, o, lnb, niocount);
-       ofd_read_unlock(env, fo);
        ofd_object_put(env, fo);
        /* second put is pair to object_get in ofd_preprw_write */
        ofd_object_put(env, fo);