Whamcloud - gitweb
LU-10496 ofd: serialize fmd check and set
[fs/lustre-release.git] / lustre / ofd / ofd_io.c
index 78c1a08..a5d49e1 100644 (file)
@@ -511,6 +511,7 @@ static int ofd_preprw_read(const struct lu_env *env, struct obd_export *exp,
        struct ofd_object *fo;
        int i, j, rc, tot_bytes = 0;
        enum dt_bufs_type dbt = DT_BUFS_TYPE_READ;
+       int maxlnb = *nr_local;
 
        ENTRY;
        LASSERT(env != NULL);
@@ -534,14 +535,19 @@ static int ofd_preprw_read(const struct lu_env *env, struct obd_export *exp,
                dbt |= DT_BUFS_TYPE_LOCAL;
 
        for (*nr_local = 0, i = 0, j = 0; i < niocount; i++) {
+
+               if (OBD_FAIL_CHECK(OBD_FAIL_OST_2BIG_NIOBUF))
+                       rnb[i].rnb_len = 100 * 1024 * 1024;
+
                rc = dt_bufs_get(env, ofd_object_child(fo), rnb + i,
-                                lnb + j, dbt);
+                                lnb + j, maxlnb, dbt);
                if (unlikely(rc < 0))
                        GOTO(buf_put, rc);
                LASSERT(rc <= PTLRPC_MAX_BRW_PAGES);
                /* correct index for local buffers to continue with */
                j += rc;
                *nr_local += rc;
+               maxlnb -= rc;
                LASSERT(j <= PTLRPC_MAX_BRW_PAGES);
                tot_bytes += rnb[i].rnb_len;
        }
@@ -599,6 +605,7 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp,
        struct ofd_object *fo;
        int i, j, k, rc = 0, tot_bytes = 0;
        enum dt_bufs_type dbt = DT_BUFS_TYPE_WRITE;
+       int maxlnb = *nr_local;
 
        ENTRY;
        LASSERT(env != NULL);
@@ -660,6 +667,12 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp,
                ofd_seq_put(env, oseq);
        }
 
+       /* Process incoming grant info, set OBD_BRW_GRANTED flag and grant some
+        * space back if possible, we have to do this outside of the lock as
+        * grant preparation may need to sync whole fs thus wait for all the
+        * transactions to complete. */
+       tgt_grant_prepare_write(env, exp, oa, rnb, obj->ioo_bufcnt);
+
        fo = ofd_object_find(env, ofd, fid);
        if (IS_ERR(fo))
                GOTO(out, rc = PTR_ERR(fo));
@@ -695,17 +708,15 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp,
                oa->o_valid &= ~OBD_MD_LAYOUT_VERSION;
        }
 
-       /* Process incoming grant info, set OBD_BRW_GRANTED flag and grant some
-        * space back if possible */
-       tgt_grant_prepare_write(env, exp, oa, rnb, obj->ioo_bufcnt);
-
        if (ptlrpc_connection_is_local(exp->exp_connection))
                dbt |= DT_BUFS_TYPE_LOCAL;
 
        /* parse remote buffers to local buffers and prepare the latter */
        for (*nr_local = 0, i = 0, j = 0; i < obj->ioo_bufcnt; i++) {
+               if (OBD_FAIL_CHECK(OBD_FAIL_OST_2BIG_NIOBUF))
+                       rnb[i].rnb_len += PAGE_SIZE;
                rc = dt_bufs_get(env, ofd_object_child(fo),
-                                rnb + i, lnb + j, dbt);
+                                rnb + i, lnb + j, maxlnb, dbt);
                if (unlikely(rc < 0))
                        GOTO(err, rc);
                LASSERT(rc <= PTLRPC_MAX_BRW_PAGES);
@@ -718,6 +729,7 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp,
                }
                j += rc;
                *nr_local += rc;
+               maxlnb -= rc;
                LASSERT(j <= PTLRPC_MAX_BRW_PAGES);
                tot_bytes += rnb[i].rnb_len;
        }
@@ -727,6 +739,7 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp,
        if (unlikely(rc != 0))
                GOTO(err, rc);
 
+       ofd_read_unlock(env, fo);
        ofd_counter_incr(exp, LPROC_OFD_STATS_WRITE, jobid, tot_bytes);
        RETURN(0);
 err:
@@ -915,11 +928,8 @@ ofd_write_attr_set(const struct lu_env *env, struct ofd_device *ofd,
        if (rc != 0)
                GOTO(out, rc);
 
-       fl = ofd_object_ff_update(env, ofd_obj, oa, ff);
-       if (fl < 0)
-               GOTO(out, rc = fl);
-
-       if (!la->la_valid && !fl)
+       if (!la->la_valid && !(oa->o_valid &
+           (OBD_MD_FLFID | OBD_MD_FLOSTLAYOUT | OBD_MD_LAYOUT_VERSION)))
                /* no attributes to set */
                GOTO(out, rc = 0);
 
@@ -933,17 +943,10 @@ ofd_write_attr_set(const struct lu_env *env, struct ofd_device *ofd,
                        GOTO(out_tx, rc);
        }
 
-       if (fl) {
-               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
-                       ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
-               else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
-                       le32_add_cpu(&ff->ff_parent.f_oid, -1);
-
-               rc = dt_declare_xattr_set(env, dt_obj, &info->fti_buf,
-                                         XATTR_NAME_FID, 0, th);
-               if (rc)
-                       GOTO(out_tx, rc);
-       }
+       rc = dt_declare_xattr_set(env, dt_obj, &info->fti_buf,
+                       XATTR_NAME_FID, 0, th);
+       if (rc)
+               GOTO(out_tx, rc);
 
        /* We don't need a transno for this operation which will be re-executed
         * anyway when the OST_WRITE (with a transno assigned) is replayed */
@@ -951,33 +954,42 @@ ofd_write_attr_set(const struct lu_env *env, struct ofd_device *ofd,
        if (rc)
                GOTO(out_tx, rc);
 
+       ofd_read_lock(env, ofd_obj);
+
        /* set uid/gid/projid */
        if (la->la_valid) {
                rc = dt_attr_set(env, dt_obj, la, th);
                if (rc)
-                       GOTO(out_tx, rc);
+                       GOTO(out_unlock, rc);
        }
 
+       fl = ofd_object_ff_update(env, ofd_obj, oa, ff);
+       if (fl <= 0)
+               GOTO(out_unlock, rc = fl);
+
        /* set filter fid EA.
         * FIXME: it holds read lock of ofd object to modify the XATTR_NAME_FID
         * while the write lock should be held. However, it should work because
         * write RPCs only modify ff_{parent,layout} and those information will
         * be the same from all the write RPCs. The reason that fl is not used
         * in dt_xattr_set() is to allow this race. */
-       if (fl) {
-               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
-                       GOTO(out_tx, rc);
-
-               info->fti_buf.lb_buf = ff;
-               info->fti_buf.lb_len = sizeof(*ff);
-               rc = dt_xattr_set(env, dt_obj, &info->fti_buf, XATTR_NAME_FID,
-                                 0, th);
-               if (rc == 0)
-                       filter_fid_le_to_cpu(&ofd_obj->ofo_ff, ff, sizeof(*ff));
-       }
-
-       GOTO(out_tx, rc);
-
+       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
+               GOTO(out_unlock, rc);
+       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
+               ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
+       else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
+               le32_add_cpu(&ff->ff_parent.f_oid, -1);
+
+       info->fti_buf.lb_buf = ff;
+       info->fti_buf.lb_len = sizeof(*ff);
+       rc = dt_xattr_set(env, dt_obj, &info->fti_buf, XATTR_NAME_FID, 0, th);
+       if (rc == 0)
+               filter_fid_le_to_cpu(&ofd_obj->ofo_ff, ff, sizeof(*ff));
+
+       GOTO(out_unlock, rc);
+
+out_unlock:
+       ofd_read_unlock(env, ofd_obj);
 out_tx:
        dt_trans_stop(env, ofd->ofd_osd, th);
 out:
@@ -1085,6 +1097,7 @@ ofd_commitrw_write(const struct lu_env *env, struct obd_export *exp,
                   int niocount, struct niobuf_local *lnb,
                   unsigned long granted, int old_rc)
 {
+       struct ofd_thread_info *info = ofd_info(env);
        struct filter_export_data *fed = &exp->exp_filter_data;
        struct ofd_object *fo;
        struct dt_object *o;
@@ -1103,13 +1116,14 @@ ofd_commitrw_write(const struct lu_env *env, struct obd_export *exp,
 
        fo = ofd_object_find(env, ofd, fid);
        LASSERT(fo != NULL);
-       LASSERT(ofd_object_exists(fo));
 
        o = ofd_object_child(fo);
        LASSERT(o != NULL);
 
        if (old_rc)
                GOTO(out, rc = old_rc);
+       if (!ofd_object_exists(fo))
+               GOTO(out, rc = -ENOENT);
 
        /*
         * The first write to each object must set some attributes.  It is
@@ -1182,21 +1196,29 @@ retry:
        if (rc)
                GOTO(out_stop, rc);
 
+       ofd_read_lock(env, fo);
+       if (!ofd_object_exists(fo))
+               GOTO(out_unlock, rc = -ENOENT);
+
        if (likely(!fake_write)) {
                rc = dt_write_commit(env, o, lnb, niocount, th);
                if (rc)
-                       GOTO(out_stop, rc);
+                       GOTO(out_unlock, rc);
        }
 
-       if (la->la_valid) {
+       /* Don't update timestamps if this write is older than a
+        * setattr which modifies the timestamps. b=10150 */
+       if (la->la_valid && tgt_fmd_check(exp, fid, info->fti_xid)) {
                rc = dt_attr_set(env, o, la, th);
                if (rc)
-                       GOTO(out_stop, rc);
+                       GOTO(out_unlock, rc);
        }
 
        /* get attr to return */
        rc = dt_attr_get(env, o, la);
 
+out_unlock:
+       ofd_read_unlock(env, fo);
 out_stop:
        /* Force commit to make the just-deleted blocks
         * reusable. LU-456 */
@@ -1232,7 +1254,6 @@ out_stop:
 
 out:
        dt_bufs_put(env, o, lnb, niocount);
-       ofd_read_unlock(env, fo);
        ofd_object_put(env, fo);
        /* second put is pair to object_get in ofd_preprw_write */
        ofd_object_put(env, fo);
@@ -1279,13 +1300,8 @@ int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
        if (cmd == OBD_BRW_WRITE) {
                struct lu_nodemap *nodemap;
 
-               /* Don't update timestamps if this write is older than a
-                * setattr which modifies the timestamps. b=10150 */
-               valid = OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLPROJID;
-               if (tgt_fmd_check(exp, fid, info->fti_xid))
-                       valid |= OBD_MD_FLATIME | OBD_MD_FLMTIME |
-                                OBD_MD_FLCTIME;
-
+               valid = OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLPROJID |
+                       OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME;
                la_from_obdo(&info->fti_attr, oa, valid);
 
                rc = ofd_commitrw_write(env, exp, ofd, fid, &info->fti_attr,
@@ -1353,7 +1369,7 @@ int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
                        rs = ldlm_resource_get(ns, NULL, &info->fti_resid,
                                               LDLM_EXTENT, 0);
                        if (!IS_ERR(rs)) {
-                               ldlm_res_lvbo_update(env, rs, NULL, 1);
+                               ldlm_res_lvbo_update(rs, NULL, 1);
                                ldlm_resource_putref(rs);
                        }
                }