Whamcloud - gitweb
LU-15894 ofd: revert range locking in ofd
[fs/lustre-release.git] / lustre / ofd / ofd_io.c
index dc5230a..8b79362 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * lustre/ofd/ofd_io.c
  *
@@ -147,6 +146,17 @@ static void ofd_inconsistency_verify_one(const struct lu_env *env,
        OBD_FREE_PTR(oii);
 }
 
+struct oivm_args {
+       struct ofd_device       *od_ofd;
+       struct lu_env           od_env;
+       struct lfsck_req_local  od_lrl;
+       struct completion       *od_started;
+};
+
+#ifndef TASK_IDLE
+#define TASK_IDLE TASK_INTERRUPTIBLE
+#endif
+
 /**
  * Verification thread to check parent FID consistency.
  *
@@ -158,52 +168,39 @@ static void ofd_inconsistency_verify_one(const struct lu_env *env,
  * \retval             0 on successful thread termination
  * \retval             negative value if thread can't start
  */
-static int ofd_inconsistency_verification_main(void *args)
+static int ofd_inconsistency_verification_main(void *_args)
 {
-       struct lu_env env;
-       struct ofd_device *ofd = args;
-       struct ptlrpc_thread *thread = &ofd->ofd_inconsistency_thread;
+       struct oivm_args *args = _args;
+       struct lu_env *env = &args->od_env;
+       struct ofd_device *ofd = args->od_ofd;
        struct ofd_inconsistency_item *oii;
-       struct lfsck_req_local *lrl = NULL;
-       int rc;
+       struct lfsck_req_local *lrl = &args->od_lrl;
        ENTRY;
 
-       rc = lu_env_init(&env, LCT_DT_THREAD);
-       spin_lock(&ofd->ofd_inconsistency_lock);
-       thread_set_flags(thread, rc ? SVC_STOPPED : SVC_RUNNING);
-       wake_up_all(&thread->t_ctl_waitq);
-       spin_unlock(&ofd->ofd_inconsistency_lock);
-       if (rc)
-               RETURN(rc);
-
-       OBD_ALLOC_PTR(lrl);
-       if (unlikely(!lrl))
-               GOTO(out_unlocked, rc = -ENOMEM);
-
        lrl->lrl_event = LEL_PAIRS_VERIFY_LOCAL;
        lrl->lrl_active = LFSCK_TYPE_LAYOUT;
+       complete(args->od_started);
 
        spin_lock(&ofd->ofd_inconsistency_lock);
-       while (1) {
-               if (unlikely(!thread_is_running(thread)))
-                       break;
+       while (({set_current_state(TASK_IDLE);
+                !kthread_should_stop(); })) {
 
                while (!list_empty(&ofd->ofd_inconsistency_list)) {
+                       __set_current_state(TASK_RUNNING);
                        oii = list_entry(ofd->ofd_inconsistency_list.next,
                                         struct ofd_inconsistency_item,
                                         oii_list);
                        list_del_init(&oii->oii_list);
                        spin_unlock(&ofd->ofd_inconsistency_lock);
-                       ofd_inconsistency_verify_one(&env, ofd, oii, lrl);
+                       ofd_inconsistency_verify_one(env, ofd, oii, lrl);
                        spin_lock(&ofd->ofd_inconsistency_lock);
                }
 
                spin_unlock(&ofd->ofd_inconsistency_lock);
-               wait_event_idle(thread->t_ctl_waitq,
-                               !list_empty(&ofd->ofd_inconsistency_list) ||
-                               !thread_is_running(thread));
+               schedule();
                spin_lock(&ofd->ofd_inconsistency_lock);
        }
+       __set_current_state(TASK_RUNNING);
 
        while (!list_empty(&ofd->ofd_inconsistency_list)) {
                struct ofd_object *fo;
@@ -215,28 +212,20 @@ static int ofd_inconsistency_verification_main(void *args)
                fo = oii->oii_obj;
                spin_unlock(&ofd->ofd_inconsistency_lock);
 
-               ofd_write_lock(&env, fo);
+               ofd_write_lock(env, fo);
                fo->ofo_pfid_checking = 0;
-               ofd_write_unlock(&env, fo);
+               ofd_write_unlock(env, fo);
 
-               ofd_object_put(&env, fo);
+               ofd_object_put(env, fo);
                OBD_FREE_PTR(oii);
                spin_lock(&ofd->ofd_inconsistency_lock);
        }
 
-       OBD_FREE_PTR(lrl);
-
-       GOTO(out, rc = 0);
-
-out_unlocked:
-       spin_lock(&ofd->ofd_inconsistency_lock);
-out:
-       thread_set_flags(thread, SVC_STOPPED);
-       wake_up_all(&thread->t_ctl_waitq);
        spin_unlock(&ofd->ofd_inconsistency_lock);
-       lu_env_fini(&env);
 
-       return rc;
+       lu_env_fini(&args->od_env);
+       OBD_FREE_PTR(args);
+       return 0;
 }
 
 /**
@@ -251,30 +240,50 @@ out:
  */
 int ofd_start_inconsistency_verification_thread(struct ofd_device *ofd)
 {
-       struct ptlrpc_thread    *thread = &ofd->ofd_inconsistency_thread;
        struct task_struct      *task;
+       struct oivm_args        *args;
+       DECLARE_COMPLETION_ONSTACK(started);
        int                      rc;
 
-       spin_lock(&ofd->ofd_inconsistency_lock);
-       if (unlikely(thread_is_running(thread))) {
-               spin_unlock(&ofd->ofd_inconsistency_lock);
-
+       if (ofd->ofd_inconsistency_task)
                return -EALREADY;
+
+       OBD_ALLOC_PTR(args);
+       if (!args)
+               return -ENOMEM;
+       rc = lu_env_init(&args->od_env, LCT_DT_THREAD);
+       if (rc) {
+               OBD_FREE_PTR(args);
+               return rc;
        }
 
-       thread_set_flags(thread, 0);
-       spin_unlock(&ofd->ofd_inconsistency_lock);
-       task = kthread_run(ofd_inconsistency_verification_main, ofd,
-                          "inconsistency_verification");
+       args->od_ofd = ofd;
+       args->od_started = &started;
+       task = kthread_create(ofd_inconsistency_verification_main, args,
+                             "inconsistency_verification");
        if (IS_ERR(task)) {
                rc = PTR_ERR(task);
                CERROR("%s: cannot start self_repair thread: rc = %d\n",
                       ofd_name(ofd), rc);
        } else {
                rc = 0;
-               wait_event_idle(thread->t_ctl_waitq,
-                               thread_is_running(thread) ||
-                               thread_is_stopped(thread));
+               spin_lock(&ofd->ofd_inconsistency_lock);
+               if (ofd->ofd_inconsistency_task)
+                       rc = -EALREADY;
+               else
+                       ofd->ofd_inconsistency_task = task;
+               spin_unlock(&ofd->ofd_inconsistency_lock);
+
+               if (rc)
+                       kthread_stop(task);
+               else {
+                       wake_up_process(task);
+                       wait_for_completion(&started);
+               }
+       }
+       if (rc) {
+               lu_env_fini(&args->od_env);
+               OBD_FREE_PTR(args);
        }
 
        return rc;
@@ -290,20 +299,16 @@ int ofd_start_inconsistency_verification_thread(struct ofd_device *ofd)
  */
 int ofd_stop_inconsistency_verification_thread(struct ofd_device *ofd)
 {
-       struct ptlrpc_thread    *thread = &ofd->ofd_inconsistency_thread;
+       struct task_struct *task;
 
        spin_lock(&ofd->ofd_inconsistency_lock);
-       if (thread_is_init(thread) || thread_is_stopped(thread)) {
-               spin_unlock(&ofd->ofd_inconsistency_lock);
+       task = ofd->ofd_inconsistency_task;
+       ofd->ofd_inconsistency_task = NULL;
+       spin_unlock(&ofd->ofd_inconsistency_lock);
 
+       if (!task)
                return -EALREADY;
-       }
-
-       thread_set_flags(thread, SVC_STOPPING);
-       spin_unlock(&ofd->ofd_inconsistency_lock);
-       wake_up_all(&thread->t_ctl_waitq);
-       wait_event_idle(thread->t_ctl_waitq,
-                       thread_is_stopped(thread));
+       kthread_stop(task);
 
        return 0;
 }
@@ -351,9 +356,9 @@ static void ofd_add_inconsistency_item(const struct lu_env *env,
        if (list_empty(&ofd->ofd_inconsistency_list))
                wakeup = true;
        list_add_tail(&oii->oii_list, &ofd->ofd_inconsistency_list);
+       if (wakeup && ofd->ofd_inconsistency_task)
+               wake_up_process(ofd->ofd_inconsistency_task);
        spin_unlock(&ofd->ofd_inconsistency_lock);
-       if (wakeup)
-               wake_up_all(&ofd->ofd_inconsistency_thread.t_ctl_waitq);
 
        /* XXX: When the found inconsistency exceeds some threshold,
         *      we can trigger the LFSCK to scan part of the system
@@ -565,14 +570,13 @@ static int ofd_preprw_read(const struct lu_env *env, struct obd_export *exp,
                           struct ofd_device *ofd, const struct lu_fid *fid,
                           struct lu_attr *la, struct obdo *oa, int niocount,
                           struct niobuf_remote *rnb, int *nr_local,
-                          struct niobuf_local *lnb, char *jobid)
+                          struct niobuf_local *lnb)
 {
        struct ofd_object *fo;
        int i, j, rc, tot_bytes = 0;
        enum dt_bufs_type dbt = DT_BUFS_TYPE_READ;
        int maxlnb = *nr_local;
        __u64 begin, end;
-       ktime_t kstart = ktime_get();
 
        ENTRY;
        LASSERT(env != NULL);
@@ -627,6 +631,7 @@ static int ofd_preprw_read(const struct lu_env *env, struct obd_export *exp,
        rc = dt_read_prep(env, ofd_object_child(fo), lnb, *nr_local);
        if (unlikely(rc))
                GOTO(buf_put, rc);
+       ofd_read_unlock(env, fo);
 
        ofd_access(env, ofd,
                &(struct lu_fid) {
@@ -639,9 +644,6 @@ static int ofd_preprw_read(const struct lu_env *env, struct obd_export *exp,
                niocount,
                READ);
 
-       ofd_counter_incr(exp, LPROC_OFD_STATS_READ_BYTES, jobid, tot_bytes);
-       ofd_counter_incr(exp, LPROC_OFD_STATS_READ, jobid,
-                        ktime_us_delta(ktime_get(), kstart));
        RETURN(0);
 
 buf_put:
@@ -680,22 +682,21 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp,
                            struct lu_attr *la, struct obdo *oa,
                            int objcount, struct obd_ioobj *obj,
                            struct niobuf_remote *rnb, int *nr_local,
-                           struct niobuf_local *lnb, char *jobid)
+                           struct niobuf_local *lnb)
 {
        struct ofd_object *fo;
        int i, j, k, rc = 0, tot_bytes = 0;
        enum dt_bufs_type dbt = DT_BUFS_TYPE_WRITE;
        int maxlnb = *nr_local;
        __u64 begin, end;
-       ktime_t kstart = ktime_get();
 
        ENTRY;
        LASSERT(env != NULL);
        LASSERT(objcount == 1);
 
        if (unlikely(exp->exp_obd->obd_recovering)) {
-               u64 seq = fid_seq(fid);
-               u64 oid = fid_oid(fid);
+               u64 seq = ostid_seq(&oa->o_oi);
+               u64 oid = ostid_id(&oa->o_oi);
                struct ofd_seq *oseq;
 
                oseq = ofd_seq_load(env, ofd, seq);
@@ -762,36 +763,13 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp,
 
        ofd_info(env)->fti_obj = fo;
 
-       ofd_read_lock(env, fo);
        if (!ofd_object_exists(fo)) {
                CERROR("%s: BRW to missing obj "DOSTID"\n",
                       exp->exp_obd->obd_name, POSTID(&obj->ioo_oid));
-               ofd_read_unlock(env, fo);
                ofd_object_put(env, fo);
                GOTO(out, rc = -ENOENT);
        }
 
-       if (ofd->ofd_lfsck_verify_pfid && oa->o_valid & OBD_MD_FLFID) {
-               rc = ofd_verify_ff(env, fo, oa);
-               if (rc != 0) {
-                       ofd_read_unlock(env, fo);
-                       ofd_object_put(env, fo);
-                       GOTO(out, rc);
-               }
-       }
-
-       /* need to verify layout version */
-       if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
-               rc = ofd_verify_layout_version(env, fo, oa);
-               if (rc) {
-                       ofd_read_unlock(env, fo);
-                       ofd_object_put(env, fo);
-                       GOTO(out, rc);
-               }
-
-               oa->o_valid &= ~OBD_MD_LAYOUT_VERSION;
-       }
-
        if (ptlrpc_connection_is_local(exp->exp_connection))
                dbt |= DT_BUFS_TYPE_LOCAL;
 
@@ -808,7 +786,7 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp,
                rc = dt_bufs_get(env, ofd_object_child(fo),
                                 rnb + i, lnb + j, maxlnb, dbt);
                if (unlikely(rc < 0))
-                       GOTO(err, rc);
+                       GOTO(err_nolock, rc);
                LASSERT(rc <= PTLRPC_MAX_BRW_PAGES);
                /* correct index for local buffers to continue with */
                for (k = 0; k < rc; k++) {
@@ -825,6 +803,27 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp,
        }
        LASSERT(*nr_local > 0 && *nr_local <= PTLRPC_MAX_BRW_PAGES);
 
+       ofd_read_lock(env, fo);
+       if (!ofd_object_exists(fo)) {
+               CERROR("%s: BRW to missing obj "DOSTID": rc = -ENOENT\n",
+                      exp->exp_obd->obd_name, POSTID(&obj->ioo_oid));
+               GOTO(err, rc = -ENOENT);
+       }
+
+       if (ofd->ofd_lfsck_verify_pfid && oa->o_valid & OBD_MD_FLFID) {
+               rc = ofd_verify_ff(env, fo, oa);
+               if (rc != 0)
+                       GOTO(err, rc);
+       }
+
+       /* need to verify layout version */
+       if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
+               rc = ofd_verify_layout_version(env, fo, oa);
+               if (rc)
+                       GOTO(err, rc);
+               oa->o_valid &= ~OBD_MD_LAYOUT_VERSION;
+       }
+
        rc = dt_write_prep(env, ofd_object_child(fo), lnb, *nr_local);
        if (unlikely(rc != 0))
                GOTO(err, rc);
@@ -842,13 +841,12 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp,
                obj->ioo_bufcnt,
                WRITE);
 
-       ofd_counter_incr(exp, LPROC_OFD_STATS_WRITE_BYTES, jobid, tot_bytes);
-       ofd_counter_incr(exp, LPROC_OFD_STATS_WRITE, jobid,
-                        ktime_us_delta(ktime_get(), kstart));
        RETURN(0);
+
 err:
-       dt_bufs_put(env, ofd_object_child(fo), lnb, *nr_local);
        ofd_read_unlock(env, fo);
+err_nolock:
+       dt_bufs_put(env, ofd_object_child(fo), lnb, *nr_local);
        ofd_object_put(env, fo);
        /* tgt_grant_prepare_write() was called, so we must commit */
        tgt_grant_commit(exp, oa->o_grant_used, rc);
@@ -887,7 +885,6 @@ int ofd_preprw(const struct lu_env *env, int cmd, struct obd_export *exp,
        struct tgt_session_info *tsi = tgt_ses_info(env);
        struct ofd_device       *ofd = ofd_exp(exp);
        struct ofd_thread_info  *info;
-       char                    *jobid;
        const struct lu_fid     *fid = &oa->o_oi.oi_fid;
        int                      rc = 0;
 
@@ -899,10 +896,8 @@ int ofd_preprw(const struct lu_env *env, int cmd, struct obd_export *exp,
 
        if (tgt_ses_req(tsi) == NULL) { /* echo client case */
                info = ofd_info_init(env, exp);
-               jobid = NULL;
        } else {
                info = tsi2ofd_info(tsi);
-               jobid = tsi->tsi_jobid;
        }
 
        LASSERT(oa != NULL);
@@ -934,12 +929,11 @@ int ofd_preprw(const struct lu_env *env, int cmd, struct obd_export *exp,
        if (cmd == OBD_BRW_WRITE) {
                la_from_obdo(&info->fti_attr, oa, OBD_MD_FLGETATTR);
                rc = ofd_preprw_write(env, exp, ofd, fid, &info->fti_attr, oa,
-                                     objcount, obj, rnb, nr_local, lnb, jobid);
+                                     objcount, obj, rnb, nr_local, lnb);
        } else if (cmd == OBD_BRW_READ) {
                tgt_grant_prepare_read(env, exp, oa);
                rc = ofd_preprw_read(env, exp, ofd, fid, &info->fti_attr, oa,
-                                    obj->ioo_bufcnt, rnb, nr_local, lnb,
-                                    jobid);
+                                    obj->ioo_bufcnt, rnb, nr_local, lnb);
        } else {
                CERROR("%s: wrong cmd %d received!\n",
                       exp->exp_obd->obd_name, cmd);
@@ -979,7 +973,6 @@ ofd_commitrw_read(const struct lu_env *env, struct ofd_device *ofd,
        LASSERT(ofd_object_exists(fo));
        dt_bufs_put(env, ofd_object_child(fo), lnb, niocount);
 
-       ofd_read_unlock(env, fo);
        ofd_object_put(env, fo);
 
        RETURN(0);
@@ -1216,7 +1209,7 @@ ofd_commitrw_write(const struct lu_env *env, struct obd_export *exp,
        int rc = 0;
        int rc2 = 0;
        int retries = 0;
-       int i;
+       int i, restart = 0;
        bool soft_sync = false;
        bool cb_registered = false;
        bool fake_write = false;
@@ -1249,7 +1242,7 @@ ofd_commitrw_write(const struct lu_env *env, struct obd_export *exp,
        la->la_valid &= LA_ATIME | LA_MTIME | LA_CTIME;
 
        /* do fake write, to simulate the write case for performance testing */
-       if (OBD_FAIL_CHECK(OBD_FAIL_OST_FAKE_RW)) {
+       if (OBD_FAIL_CHECK_QUIET(OBD_FAIL_OST_FAKE_RW)) {
                struct niobuf_local *last = &lnb[niocount - 1];
                __u64 file_size = last->lnb_file_offset + last->lnb_len;
                __u64 valid = la->la_valid;
@@ -1315,14 +1308,6 @@ retry:
        if (!ofd_object_exists(fo))
                GOTO(out_unlock, rc = -ENOENT);
 
-       if (likely(!fake_write)) {
-               OBD_FAIL_TIMEOUT_ORSET(OBD_FAIL_OST_WR_ATTR_DELAY,
-                                      OBD_FAIL_ONCE, cfs_fail_val);
-               rc = dt_write_commit(env, o, lnb, niocount, th, oa->o_size);
-               if (rc)
-                       GOTO(out_unlock, rc);
-       }
-
        /* Don't update timestamps if this write is older than a
         * setattr which modifies the timestamps. b=10150 */
        if (la->la_valid && tgt_fmd_check(exp, fid, info->fti_xid)) {
@@ -1333,6 +1318,16 @@ retry:
                        fo->ofo_atime_ondisk = la->la_atime;
        }
 
+       if (likely(!fake_write)) {
+               OBD_FAIL_TIMEOUT_ORSET(OBD_FAIL_OST_WR_ATTR_DELAY,
+                                      OBD_FAIL_ONCE, cfs_fail_val);
+               rc = dt_write_commit(env, o, lnb, niocount, th, oa->o_size);
+               if (rc) {
+                       restart = th->th_restart_tran;
+                       GOTO(out_unlock, rc);
+               }
+       }
+
        /* get attr to return */
        rc = dt_attr_get(env, o, la);
 
@@ -1355,7 +1350,7 @@ out_stop:
                        granted = 0;
        }
 
-       rc2 = ofd_trans_stop(env, ofd, th, rc);
+       rc2 = ofd_trans_stop(env, ofd, th, restart ? 0 : rc);
        if (!rc)
                rc = rc2;
        if (rc == -ENOSPC && retries++ < 3) {
@@ -1364,6 +1359,16 @@ out_stop:
                goto retry;
        }
 
+       if (restart) {
+               retries++;
+               restart = 0;
+               if (retries % 10000 == 0)
+                       CERROR("%s: restart IO write too many times: %d\n",
+                               ofd_name(ofd), retries);
+               CDEBUG(D_INODE, "retry transaction, retries:%d\n",
+                      retries);
+               goto retry;
+       }
        if (!soft_sync)
                /* reset fed_soft_sync_count upon non-SOFT_SYNC RPC */
                atomic_set(&fed->fed_soft_sync_count, 0);
@@ -1404,20 +1409,65 @@ out:
 int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
                 struct obdo *oa, int objcount, struct obd_ioobj *obj,
                 struct niobuf_remote *rnb, int npages,
-                struct niobuf_local *lnb, int old_rc)
+                struct niobuf_local *lnb, int old_rc, int nob, ktime_t kstart)
 {
+       struct tgt_session_info *tsi = tgt_ses_info(env);
        struct ofd_thread_info *info = ofd_info(env);
        struct ofd_device *ofd = ofd_exp(exp);
        const struct lu_fid *fid = &oa->o_oi.oi_fid;
        struct ldlm_namespace *ns = ofd->ofd_namespace;
        struct ldlm_resource *rs = NULL;
+       char *jobid;
        __u64 valid;
        int rc = 0;
+       int root_squash = 0;
 
        LASSERT(npages > 0);
 
+       if (tgt_ses_req(tsi) == NULL) { /* echo client case */
+               jobid = NULL;
+       } else {
+               jobid = tsi->tsi_jobid;
+       }
+
        if (cmd == OBD_BRW_WRITE) {
                struct lu_nodemap *nodemap;
+               __u32 mapped_uid, mapped_gid, mapped_projid;
+
+               /* doing this before the commit operation places the counter
+                * update almost immediately after reply to the client, which
+                * gives reasonable time stats and lets us use the actual
+                * bytes of i/o (rather than requested)
+                */
+               ofd_counter_incr(exp, LPROC_OFD_STATS_WRITE_BYTES, jobid, nob);
+               ofd_counter_incr(exp, LPROC_OFD_STATS_WRITE, jobid,
+                                ktime_us_delta(ktime_get(), kstart));
+
+               nodemap = nodemap_get_from_exp(exp);
+               if (IS_ERR(nodemap))
+                       RETURN(PTR_ERR(nodemap));
+               mapped_uid = nodemap_map_id(nodemap, NODEMAP_UID,
+                                           NODEMAP_FS_TO_CLIENT,
+                                           oa->o_uid);
+               mapped_gid = nodemap_map_id(nodemap, NODEMAP_GID,
+                                           NODEMAP_FS_TO_CLIENT,
+                                           oa->o_gid);
+               mapped_projid = nodemap_map_id(nodemap, NODEMAP_PROJID,
+                                              NODEMAP_FS_TO_CLIENT,
+                                              oa->o_projid);
+
+               if (!IS_ERR_OR_NULL(nodemap)) {
+                       /* do not bypass quota enforcement if squashed uid */
+                       if (unlikely(mapped_uid == nodemap->nm_squash_uid)) {
+                               int idx;
+
+                               for (idx = 0; idx < npages; idx++)
+                                       lnb[idx].lnb_flags &=
+                                               ~OBD_BRW_SYS_RESOURCE;
+                               root_squash = 1;
+                       }
+                       nodemap_putref(nodemap);
+               }
 
                valid = OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLPROJID |
                        OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME;
@@ -1458,6 +1508,9 @@ int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
                                        oa->o_flags = OBD_FL_NO_PRJQUOTA;
                        }
 
+                       if (root_squash)
+                               oa->o_flags |= OBD_FL_ROOT_SQUASH;
+
                        oa->o_valid |= OBD_MD_FLFLAGS;
                        oa->o_valid |= OBD_MD_FLALLQUOTA;
                }
@@ -1482,17 +1535,15 @@ int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
                /* Convert back to client IDs. LU-9671.
                 * nodemap_get_from_exp() may fail due to nodemap deactivated,
                 * server ID will be returned back to client in that case. */
-               nodemap = nodemap_get_from_exp(exp);
-               if (nodemap != NULL && !IS_ERR(nodemap)) {
-                       oa->o_uid = nodemap_map_id(nodemap, NODEMAP_UID,
-                                                  NODEMAP_FS_TO_CLIENT,
-                                                  oa->o_uid);
-                       oa->o_gid = nodemap_map_id(nodemap, NODEMAP_GID,
-                                                  NODEMAP_FS_TO_CLIENT,
-                                                  oa->o_gid);
-                       nodemap_putref(nodemap);
-               }
+               oa->o_uid = mapped_uid;
+               oa->o_gid = mapped_gid;
+               oa->o_projid = mapped_projid;
        } else if (cmd == OBD_BRW_READ) {
+               /* see comment on LPROC_OFD_STATS_WRITE_BYTES usage above */
+               ofd_counter_incr(exp, LPROC_OFD_STATS_READ_BYTES, jobid, nob);
+               ofd_counter_incr(exp, LPROC_OFD_STATS_READ, jobid,
+                        ktime_us_delta(ktime_get(), kstart));
+
                rc = ofd_commitrw_read(env, ofd, fid, objcount,
                                       npages, lnb);
                if (old_rc)