Whamcloud - gitweb
LU-15894 ofd: revert range locking in ofd
[fs/lustre-release.git] / lustre / ofd / ofd_io.c
index d86bb5c..8b79362 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * lustre/ofd/ofd_io.c
  *
@@ -147,6 +146,17 @@ static void ofd_inconsistency_verify_one(const struct lu_env *env,
        OBD_FREE_PTR(oii);
 }
 
+struct oivm_args {
+       struct ofd_device       *od_ofd;
+       struct lu_env           od_env;
+       struct lfsck_req_local  od_lrl;
+       struct completion       *od_started;
+};
+
+#ifndef TASK_IDLE
+#define TASK_IDLE TASK_INTERRUPTIBLE
+#endif
+
 /**
  * Verification thread to check parent FID consistency.
  *
@@ -158,52 +168,39 @@ static void ofd_inconsistency_verify_one(const struct lu_env *env,
  * \retval             0 on successful thread termination
  * \retval             negative value if thread can't start
  */
-static int ofd_inconsistency_verification_main(void *args)
+static int ofd_inconsistency_verification_main(void *_args)
 {
-       struct lu_env env;
-       struct ofd_device *ofd = args;
-       struct ptlrpc_thread *thread = &ofd->ofd_inconsistency_thread;
+       struct oivm_args *args = _args;
+       struct lu_env *env = &args->od_env;
+       struct ofd_device *ofd = args->od_ofd;
        struct ofd_inconsistency_item *oii;
-       struct lfsck_req_local *lrl = NULL;
-       int rc;
+       struct lfsck_req_local *lrl = &args->od_lrl;
        ENTRY;
 
-       rc = lu_env_init(&env, LCT_DT_THREAD);
-       spin_lock(&ofd->ofd_inconsistency_lock);
-       thread_set_flags(thread, rc ? SVC_STOPPED : SVC_RUNNING);
-       wake_up_all(&thread->t_ctl_waitq);
-       spin_unlock(&ofd->ofd_inconsistency_lock);
-       if (rc)
-               RETURN(rc);
-
-       OBD_ALLOC_PTR(lrl);
-       if (unlikely(!lrl))
-               GOTO(out_unlocked, rc = -ENOMEM);
-
        lrl->lrl_event = LEL_PAIRS_VERIFY_LOCAL;
        lrl->lrl_active = LFSCK_TYPE_LAYOUT;
+       complete(args->od_started);
 
        spin_lock(&ofd->ofd_inconsistency_lock);
-       while (1) {
-               if (unlikely(!thread_is_running(thread)))
-                       break;
+       while (({set_current_state(TASK_IDLE);
+                !kthread_should_stop(); })) {
 
                while (!list_empty(&ofd->ofd_inconsistency_list)) {
+                       __set_current_state(TASK_RUNNING);
                        oii = list_entry(ofd->ofd_inconsistency_list.next,
                                         struct ofd_inconsistency_item,
                                         oii_list);
                        list_del_init(&oii->oii_list);
                        spin_unlock(&ofd->ofd_inconsistency_lock);
-                       ofd_inconsistency_verify_one(&env, ofd, oii, lrl);
+                       ofd_inconsistency_verify_one(env, ofd, oii, lrl);
                        spin_lock(&ofd->ofd_inconsistency_lock);
                }
 
                spin_unlock(&ofd->ofd_inconsistency_lock);
-               wait_event_idle(thread->t_ctl_waitq,
-                               !list_empty(&ofd->ofd_inconsistency_list) ||
-                               !thread_is_running(thread));
+               schedule();
                spin_lock(&ofd->ofd_inconsistency_lock);
        }
+       __set_current_state(TASK_RUNNING);
 
        while (!list_empty(&ofd->ofd_inconsistency_list)) {
                struct ofd_object *fo;
@@ -215,28 +212,20 @@ static int ofd_inconsistency_verification_main(void *args)
                fo = oii->oii_obj;
                spin_unlock(&ofd->ofd_inconsistency_lock);
 
-               ofd_write_lock(&env, fo);
+               ofd_write_lock(env, fo);
                fo->ofo_pfid_checking = 0;
-               ofd_write_unlock(&env, fo);
+               ofd_write_unlock(env, fo);
 
-               ofd_object_put(&env, fo);
+               ofd_object_put(env, fo);
                OBD_FREE_PTR(oii);
                spin_lock(&ofd->ofd_inconsistency_lock);
        }
 
-       OBD_FREE_PTR(lrl);
-
-       GOTO(out, rc = 0);
-
-out_unlocked:
-       spin_lock(&ofd->ofd_inconsistency_lock);
-out:
-       thread_set_flags(thread, SVC_STOPPED);
-       wake_up_all(&thread->t_ctl_waitq);
        spin_unlock(&ofd->ofd_inconsistency_lock);
-       lu_env_fini(&env);
 
-       return rc;
+       lu_env_fini(&args->od_env);
+       OBD_FREE_PTR(args);
+       return 0;
 }
 
 /**
@@ -251,30 +240,50 @@ out:
  */
 int ofd_start_inconsistency_verification_thread(struct ofd_device *ofd)
 {
-       struct ptlrpc_thread    *thread = &ofd->ofd_inconsistency_thread;
        struct task_struct      *task;
+       struct oivm_args        *args;
+       DECLARE_COMPLETION_ONSTACK(started);
        int                      rc;
 
-       spin_lock(&ofd->ofd_inconsistency_lock);
-       if (unlikely(thread_is_running(thread))) {
-               spin_unlock(&ofd->ofd_inconsistency_lock);
-
+       if (ofd->ofd_inconsistency_task)
                return -EALREADY;
+
+       OBD_ALLOC_PTR(args);
+       if (!args)
+               return -ENOMEM;
+       rc = lu_env_init(&args->od_env, LCT_DT_THREAD);
+       if (rc) {
+               OBD_FREE_PTR(args);
+               return rc;
        }
 
-       thread_set_flags(thread, 0);
-       spin_unlock(&ofd->ofd_inconsistency_lock);
-       task = kthread_run(ofd_inconsistency_verification_main, ofd,
-                          "inconsistency_verification");
+       args->od_ofd = ofd;
+       args->od_started = &started;
+       task = kthread_create(ofd_inconsistency_verification_main, args,
+                             "inconsistency_verification");
        if (IS_ERR(task)) {
                rc = PTR_ERR(task);
                CERROR("%s: cannot start self_repair thread: rc = %d\n",
                       ofd_name(ofd), rc);
        } else {
                rc = 0;
-               wait_event_idle(thread->t_ctl_waitq,
-                               thread_is_running(thread) ||
-                               thread_is_stopped(thread));
+               spin_lock(&ofd->ofd_inconsistency_lock);
+               if (ofd->ofd_inconsistency_task)
+                       rc = -EALREADY;
+               else
+                       ofd->ofd_inconsistency_task = task;
+               spin_unlock(&ofd->ofd_inconsistency_lock);
+
+               if (rc)
+                       kthread_stop(task);
+               else {
+                       wake_up_process(task);
+                       wait_for_completion(&started);
+               }
+       }
+       if (rc) {
+               lu_env_fini(&args->od_env);
+               OBD_FREE_PTR(args);
        }
 
        return rc;
@@ -290,20 +299,16 @@ int ofd_start_inconsistency_verification_thread(struct ofd_device *ofd)
  */
 int ofd_stop_inconsistency_verification_thread(struct ofd_device *ofd)
 {
-       struct ptlrpc_thread    *thread = &ofd->ofd_inconsistency_thread;
+       struct task_struct *task;
 
        spin_lock(&ofd->ofd_inconsistency_lock);
-       if (thread_is_init(thread) || thread_is_stopped(thread)) {
-               spin_unlock(&ofd->ofd_inconsistency_lock);
+       task = ofd->ofd_inconsistency_task;
+       ofd->ofd_inconsistency_task = NULL;
+       spin_unlock(&ofd->ofd_inconsistency_lock);
 
+       if (!task)
                return -EALREADY;
-       }
-
-       thread_set_flags(thread, SVC_STOPPING);
-       spin_unlock(&ofd->ofd_inconsistency_lock);
-       wake_up_all(&thread->t_ctl_waitq);
-       wait_event_idle(thread->t_ctl_waitq,
-                       thread_is_stopped(thread));
+       kthread_stop(task);
 
        return 0;
 }
@@ -351,9 +356,9 @@ static void ofd_add_inconsistency_item(const struct lu_env *env,
        if (list_empty(&ofd->ofd_inconsistency_list))
                wakeup = true;
        list_add_tail(&oii->oii_list, &ofd->ofd_inconsistency_list);
+       if (wakeup && ofd->ofd_inconsistency_task)
+               wake_up_process(ofd->ofd_inconsistency_task);
        spin_unlock(&ofd->ofd_inconsistency_lock);
-       if (wakeup)
-               wake_up_all(&ofd->ofd_inconsistency_thread.t_ctl_waitq);
 
        /* XXX: When the found inconsistency exceeds some threshold,
         *      we can trigger the LFSCK to scan part of the system
@@ -475,6 +480,71 @@ out:
 
 }
 
+/*
+ * Lazy ATIME update to refresh atime every ofd_atime_diff
+ * seconds so that external scanning tool can see it actual
+ * within that period and be able to identify accessed files
+ */
+static void ofd_handle_atime(const struct lu_env *env, struct ofd_device *ofd,
+                            struct ofd_object *fo, time64_t atime)
+{
+       struct lu_attr *la;
+       struct dt_object *o;
+       struct thandle *th;
+       int rc;
+
+       if (ofd->ofd_atime_diff == 0)
+               return;
+
+       la = &ofd_info(env)->fti_attr2;
+       o = ofd_object_child(fo);
+
+       if (unlikely(fo->ofo_atime_ondisk == 0)) {
+               rc = dt_attr_get(env, o, la);
+               if (unlikely(rc))
+                       return;
+               LASSERT(la->la_valid & LA_ATIME);
+               if (la->la_atime == 0)
+                       la->la_atime = la->la_mtime;
+               fo->ofo_atime_ondisk = la->la_atime;
+       }
+       if (atime - fo->ofo_atime_ondisk < ofd->ofd_atime_diff)
+               return;
+
+       /* atime hasn't been updated too long, update it */
+       fo->ofo_atime_ondisk = atime;
+
+       th = ofd_trans_create(env, ofd);
+       if (IS_ERR(th)) {
+               CERROR("%s: cannot create transaction: rc = %d\n",
+                      ofd_name(ofd), (int)PTR_ERR(th));
+               return;
+       }
+
+       la->la_valid = LA_ATIME;
+       rc = dt_declare_attr_set(env, o, la, th);
+       if (rc)
+               GOTO(out_tx, rc);
+
+       rc = dt_trans_start_local(env, ofd->ofd_osd , th);
+       if (rc) {
+               CERROR("%s: cannot start transaction: rc = %d\n",
+                      ofd_name(ofd), rc);
+               GOTO(out_tx, rc);
+       }
+
+       ofd_read_lock(env, fo);
+       if (ofd_object_exists(fo)) {
+               la->la_atime = fo->ofo_atime_ondisk;
+               rc = dt_attr_set(env, o, la, th);
+       }
+
+       ofd_read_unlock(env, fo);
+
+out_tx:
+       ofd_trans_stop(env, ofd, th, rc);
+}
+
 /**
  * Prepare buffers for read request processing.
  *
@@ -500,12 +570,13 @@ static int ofd_preprw_read(const struct lu_env *env, struct obd_export *exp,
                           struct ofd_device *ofd, const struct lu_fid *fid,
                           struct lu_attr *la, struct obdo *oa, int niocount,
                           struct niobuf_remote *rnb, int *nr_local,
-                          struct niobuf_local *lnb, char *jobid)
+                          struct niobuf_local *lnb)
 {
        struct ofd_object *fo;
        int i, j, rc, tot_bytes = 0;
        enum dt_bufs_type dbt = DT_BUFS_TYPE_READ;
        int maxlnb = *nr_local;
+       __u64 begin, end;
 
        ENTRY;
        LASSERT(env != NULL);
@@ -517,6 +588,9 @@ static int ofd_preprw_read(const struct lu_env *env, struct obd_export *exp,
 
        ofd_info(env)->fti_obj = fo;
 
+       if (oa->o_valid & OBD_MD_FLATIME)
+               ofd_handle_atime(env, ofd, fo, oa->o_atime);
+
        ofd_read_lock(env, fo);
        if (!ofd_object_exists(fo))
                GOTO(unlock, rc = -ENOENT);
@@ -530,7 +604,12 @@ static int ofd_preprw_read(const struct lu_env *env, struct obd_export *exp,
        if (ptlrpc_connection_is_local(exp->exp_connection))
                dbt |= DT_BUFS_TYPE_LOCAL;
 
+       begin = -1;
+       end = 0;
+
        for (*nr_local = 0, i = 0, j = 0; i < niocount; i++) {
+               begin = min_t(__u64, begin, rnb[i].rnb_offset);
+               end = max_t(__u64, end, rnb[i].rnb_offset + rnb[i].rnb_len);
 
                if (OBD_FAIL_CHECK(OBD_FAIL_OST_2BIG_NIOBUF))
                        rnb[i].rnb_len = 100 * 1024 * 1024;
@@ -552,8 +631,19 @@ static int ofd_preprw_read(const struct lu_env *env, struct obd_export *exp,
        rc = dt_read_prep(env, ofd_object_child(fo), lnb, *nr_local);
        if (unlikely(rc))
                GOTO(buf_put, rc);
+       ofd_read_unlock(env, fo);
+
+       ofd_access(env, ofd,
+               &(struct lu_fid) {
+                       .f_seq = oa->o_parent_seq,
+                       .f_oid = oa->o_parent_oid,
+                       .f_ver = oa->o_stripe_idx,
+               },
+               begin, end,
+               tot_bytes,
+               niocount,
+               READ);
 
-       ofd_counter_incr(exp, LPROC_OFD_STATS_READ, jobid, tot_bytes);
        RETURN(0);
 
 buf_put:
@@ -592,20 +682,21 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp,
                            struct lu_attr *la, struct obdo *oa,
                            int objcount, struct obd_ioobj *obj,
                            struct niobuf_remote *rnb, int *nr_local,
-                           struct niobuf_local *lnb, char *jobid)
+                           struct niobuf_local *lnb)
 {
        struct ofd_object *fo;
        int i, j, k, rc = 0, tot_bytes = 0;
        enum dt_bufs_type dbt = DT_BUFS_TYPE_WRITE;
        int maxlnb = *nr_local;
+       __u64 begin, end;
 
        ENTRY;
        LASSERT(env != NULL);
        LASSERT(objcount == 1);
 
        if (unlikely(exp->exp_obd->obd_recovering)) {
-               u64 seq = fid_seq(fid);
-               u64 oid = fid_oid(fid);
+               u64 seq = ostid_seq(&oa->o_oi);
+               u64 oid = ostid_id(&oa->o_oi);
                struct ofd_seq *oseq;
 
                oseq = ofd_seq_load(env, ofd, seq);
@@ -672,47 +763,30 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp,
 
        ofd_info(env)->fti_obj = fo;
 
-       ofd_read_lock(env, fo);
        if (!ofd_object_exists(fo)) {
                CERROR("%s: BRW to missing obj "DOSTID"\n",
                       exp->exp_obd->obd_name, POSTID(&obj->ioo_oid));
-               ofd_read_unlock(env, fo);
                ofd_object_put(env, fo);
                GOTO(out, rc = -ENOENT);
        }
 
-       if (ofd->ofd_lfsck_verify_pfid && oa->o_valid & OBD_MD_FLFID) {
-               rc = ofd_verify_ff(env, fo, oa);
-               if (rc != 0) {
-                       ofd_read_unlock(env, fo);
-                       ofd_object_put(env, fo);
-                       GOTO(out, rc);
-               }
-       }
-
-       /* need to verify layout version */
-       if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
-               rc = ofd_verify_layout_version(env, fo, oa);
-               if (rc) {
-                       ofd_read_unlock(env, fo);
-                       ofd_object_put(env, fo);
-                       GOTO(out, rc);
-               }
-
-               oa->o_valid &= ~OBD_MD_LAYOUT_VERSION;
-       }
-
        if (ptlrpc_connection_is_local(exp->exp_connection))
                dbt |= DT_BUFS_TYPE_LOCAL;
 
+       begin = -1;
+       end = 0;
+
        /* parse remote buffers to local buffers and prepare the latter */
        for (*nr_local = 0, i = 0, j = 0; i < obj->ioo_bufcnt; i++) {
+               begin = min_t(__u64, begin, rnb[i].rnb_offset);
+               end = max_t(__u64, end, rnb[i].rnb_offset + rnb[i].rnb_len);
+
                if (OBD_FAIL_CHECK(OBD_FAIL_OST_2BIG_NIOBUF))
                        rnb[i].rnb_len += PAGE_SIZE;
                rc = dt_bufs_get(env, ofd_object_child(fo),
                                 rnb + i, lnb + j, maxlnb, dbt);
                if (unlikely(rc < 0))
-                       GOTO(err, rc);
+                       GOTO(err_nolock, rc);
                LASSERT(rc <= PTLRPC_MAX_BRW_PAGES);
                /* correct index for local buffers to continue with */
                for (k = 0; k < rc; k++) {
@@ -729,16 +803,50 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp,
        }
        LASSERT(*nr_local > 0 && *nr_local <= PTLRPC_MAX_BRW_PAGES);
 
+       ofd_read_lock(env, fo);
+       if (!ofd_object_exists(fo)) {
+               CERROR("%s: BRW to missing obj "DOSTID": rc = -ENOENT\n",
+                      exp->exp_obd->obd_name, POSTID(&obj->ioo_oid));
+               GOTO(err, rc = -ENOENT);
+       }
+
+       if (ofd->ofd_lfsck_verify_pfid && oa->o_valid & OBD_MD_FLFID) {
+               rc = ofd_verify_ff(env, fo, oa);
+               if (rc != 0)
+                       GOTO(err, rc);
+       }
+
+       /* need to verify layout version */
+       if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
+               rc = ofd_verify_layout_version(env, fo, oa);
+               if (rc)
+                       GOTO(err, rc);
+               oa->o_valid &= ~OBD_MD_LAYOUT_VERSION;
+       }
+
        rc = dt_write_prep(env, ofd_object_child(fo), lnb, *nr_local);
        if (unlikely(rc != 0))
                GOTO(err, rc);
 
        ofd_read_unlock(env, fo);
-       ofd_counter_incr(exp, LPROC_OFD_STATS_WRITE, jobid, tot_bytes);
+
+       ofd_access(env, ofd,
+               &(struct lu_fid) {
+                       .f_seq = oa->o_parent_seq,
+                       .f_oid = oa->o_parent_oid,
+                       .f_ver = oa->o_stripe_idx,
+               },
+               begin, end,
+               tot_bytes,
+               obj->ioo_bufcnt,
+               WRITE);
+
        RETURN(0);
+
 err:
-       dt_bufs_put(env, ofd_object_child(fo), lnb, *nr_local);
        ofd_read_unlock(env, fo);
+err_nolock:
+       dt_bufs_put(env, ofd_object_child(fo), lnb, *nr_local);
        ofd_object_put(env, fo);
        /* tgt_grant_prepare_write() was called, so we must commit */
        tgt_grant_commit(exp, oa->o_grant_used, rc);
@@ -777,7 +885,6 @@ int ofd_preprw(const struct lu_env *env, int cmd, struct obd_export *exp,
        struct tgt_session_info *tsi = tgt_ses_info(env);
        struct ofd_device       *ofd = ofd_exp(exp);
        struct ofd_thread_info  *info;
-       char                    *jobid;
        const struct lu_fid     *fid = &oa->o_oi.oi_fid;
        int                      rc = 0;
 
@@ -789,10 +896,8 @@ int ofd_preprw(const struct lu_env *env, int cmd, struct obd_export *exp,
 
        if (tgt_ses_req(tsi) == NULL) { /* echo client case */
                info = ofd_info_init(env, exp);
-               jobid = NULL;
        } else {
                info = tsi2ofd_info(tsi);
-               jobid = tsi->tsi_jobid;
        }
 
        LASSERT(oa != NULL);
@@ -824,12 +929,11 @@ int ofd_preprw(const struct lu_env *env, int cmd, struct obd_export *exp,
        if (cmd == OBD_BRW_WRITE) {
                la_from_obdo(&info->fti_attr, oa, OBD_MD_FLGETATTR);
                rc = ofd_preprw_write(env, exp, ofd, fid, &info->fti_attr, oa,
-                                     objcount, obj, rnb, nr_local, lnb, jobid);
+                                     objcount, obj, rnb, nr_local, lnb);
        } else if (cmd == OBD_BRW_READ) {
                tgt_grant_prepare_read(env, exp, oa);
                rc = ofd_preprw_read(env, exp, ofd, fid, &info->fti_attr, oa,
-                                    obj->ioo_bufcnt, rnb, nr_local, lnb,
-                                    jobid);
+                                    obj->ioo_bufcnt, rnb, nr_local, lnb);
        } else {
                CERROR("%s: wrong cmd %d received!\n",
                       exp->exp_obd->obd_name, cmd);
@@ -869,7 +973,6 @@ ofd_commitrw_read(const struct lu_env *env, struct ofd_device *ofd,
        LASSERT(ofd_object_exists(fo));
        dt_bufs_put(env, ofd_object_child(fo), lnb, niocount);
 
-       ofd_read_unlock(env, fo);
        ofd_object_put(env, fo);
 
        RETURN(0);
@@ -1106,7 +1209,7 @@ ofd_commitrw_write(const struct lu_env *env, struct obd_export *exp,
        int rc = 0;
        int rc2 = 0;
        int retries = 0;
-       int i;
+       int i, restart = 0;
        bool soft_sync = false;
        bool cb_registered = false;
        bool fake_write = false;
@@ -1139,7 +1242,7 @@ ofd_commitrw_write(const struct lu_env *env, struct obd_export *exp,
        la->la_valid &= LA_ATIME | LA_MTIME | LA_CTIME;
 
        /* do fake write, to simulate the write case for performance testing */
-       if (OBD_FAIL_CHECK(OBD_FAIL_OST_FAKE_RW)) {
+       if (OBD_FAIL_CHECK_QUIET(OBD_FAIL_OST_FAKE_RW)) {
                struct niobuf_local *last = &lnb[niocount - 1];
                __u64 file_size = last->lnb_file_offset + last->lnb_len;
                __u64 valid = la->la_valid;
@@ -1186,6 +1289,10 @@ retry:
                        GOTO(out_stop, rc);
        }
 
+       /* don't update atime on disk if it is older */
+       if (la->la_valid & LA_ATIME && la->la_atime <= fo->ofo_atime_ondisk)
+               la->la_valid &= ~LA_ATIME;
+
        if (la->la_valid) {
                /* update [mac]time if needed */
                rc = dt_declare_attr_set(env, o, la, th);
@@ -1201,18 +1308,24 @@ retry:
        if (!ofd_object_exists(fo))
                GOTO(out_unlock, rc = -ENOENT);
 
-       if (likely(!fake_write)) {
-               rc = dt_write_commit(env, o, lnb, niocount, th);
-               if (rc)
-                       GOTO(out_unlock, rc);
-       }
-
        /* Don't update timestamps if this write is older than a
         * setattr which modifies the timestamps. b=10150 */
        if (la->la_valid && tgt_fmd_check(exp, fid, info->fti_xid)) {
                rc = dt_attr_set(env, o, la, th);
                if (rc)
                        GOTO(out_unlock, rc);
+               if (la->la_valid & LA_ATIME)
+                       fo->ofo_atime_ondisk = la->la_atime;
+       }
+
+       if (likely(!fake_write)) {
+               OBD_FAIL_TIMEOUT_ORSET(OBD_FAIL_OST_WR_ATTR_DELAY,
+                                      OBD_FAIL_ONCE, cfs_fail_val);
+               rc = dt_write_commit(env, o, lnb, niocount, th, oa->o_size);
+               if (rc) {
+                       restart = th->th_restart_tran;
+                       GOTO(out_unlock, rc);
+               }
        }
 
        /* get attr to return */
@@ -1237,7 +1350,7 @@ out_stop:
                        granted = 0;
        }
 
-       rc2 = ofd_trans_stop(env, ofd, th, rc);
+       rc2 = ofd_trans_stop(env, ofd, th, restart ? 0 : rc);
        if (!rc)
                rc = rc2;
        if (rc == -ENOSPC && retries++ < 3) {
@@ -1246,6 +1359,16 @@ out_stop:
                goto retry;
        }
 
+       if (restart) {
+               retries++;
+               restart = 0;
+               if (retries % 10000 == 0)
+                       CERROR("%s: restart IO write too many times: %d\n",
+                               ofd_name(ofd), retries);
+               CDEBUG(D_INODE, "retry transaction, retries:%d\n",
+                      retries);
+               goto retry;
+       }
        if (!soft_sync)
                /* reset fed_soft_sync_count upon non-SOFT_SYNC RPC */
                atomic_set(&fed->fed_soft_sync_count, 0);
@@ -1286,20 +1409,65 @@ out:
 int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
                 struct obdo *oa, int objcount, struct obd_ioobj *obj,
                 struct niobuf_remote *rnb, int npages,
-                struct niobuf_local *lnb, int old_rc)
+                struct niobuf_local *lnb, int old_rc, int nob, ktime_t kstart)
 {
+       struct tgt_session_info *tsi = tgt_ses_info(env);
        struct ofd_thread_info *info = ofd_info(env);
        struct ofd_device *ofd = ofd_exp(exp);
        const struct lu_fid *fid = &oa->o_oi.oi_fid;
        struct ldlm_namespace *ns = ofd->ofd_namespace;
        struct ldlm_resource *rs = NULL;
+       char *jobid;
        __u64 valid;
        int rc = 0;
+       int root_squash = 0;
 
        LASSERT(npages > 0);
 
+       if (tgt_ses_req(tsi) == NULL) { /* echo client case */
+               jobid = NULL;
+       } else {
+               jobid = tsi->tsi_jobid;
+       }
+
        if (cmd == OBD_BRW_WRITE) {
                struct lu_nodemap *nodemap;
+               __u32 mapped_uid, mapped_gid, mapped_projid;
+
+               /* doing this before the commit operation places the counter
+                * update almost immediately after reply to the client, which
+                * gives reasonable time stats and lets us use the actual
+                * bytes of i/o (rather than requested)
+                */
+               ofd_counter_incr(exp, LPROC_OFD_STATS_WRITE_BYTES, jobid, nob);
+               ofd_counter_incr(exp, LPROC_OFD_STATS_WRITE, jobid,
+                                ktime_us_delta(ktime_get(), kstart));
+
+               nodemap = nodemap_get_from_exp(exp);
+               if (IS_ERR(nodemap))
+                       RETURN(PTR_ERR(nodemap));
+               mapped_uid = nodemap_map_id(nodemap, NODEMAP_UID,
+                                           NODEMAP_FS_TO_CLIENT,
+                                           oa->o_uid);
+               mapped_gid = nodemap_map_id(nodemap, NODEMAP_GID,
+                                           NODEMAP_FS_TO_CLIENT,
+                                           oa->o_gid);
+               mapped_projid = nodemap_map_id(nodemap, NODEMAP_PROJID,
+                                              NODEMAP_FS_TO_CLIENT,
+                                              oa->o_projid);
+
+               if (!IS_ERR_OR_NULL(nodemap)) {
+                       /* do not bypass quota enforcement if squashed uid */
+                       if (unlikely(mapped_uid == nodemap->nm_squash_uid)) {
+                               int idx;
+
+                               for (idx = 0; idx < npages; idx++)
+                                       lnb[idx].lnb_flags &=
+                                               ~OBD_BRW_SYS_RESOURCE;
+                               root_squash = 1;
+                       }
+                       nodemap_putref(nodemap);
+               }
 
                valid = OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLPROJID |
                        OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME;
@@ -1340,6 +1508,9 @@ int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
                                        oa->o_flags = OBD_FL_NO_PRJQUOTA;
                        }
 
+                       if (root_squash)
+                               oa->o_flags |= OBD_FL_ROOT_SQUASH;
+
                        oa->o_valid |= OBD_MD_FLFLAGS;
                        oa->o_valid |= OBD_MD_FLALLQUOTA;
                }
@@ -1364,17 +1535,15 @@ int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
                /* Convert back to client IDs. LU-9671.
                 * nodemap_get_from_exp() may fail due to nodemap deactivated,
                 * server ID will be returned back to client in that case. */
-               nodemap = nodemap_get_from_exp(exp);
-               if (nodemap != NULL && !IS_ERR(nodemap)) {
-                       oa->o_uid = nodemap_map_id(nodemap, NODEMAP_UID,
-                                                  NODEMAP_FS_TO_CLIENT,
-                                                  oa->o_uid);
-                       oa->o_gid = nodemap_map_id(nodemap, NODEMAP_GID,
-                                                  NODEMAP_FS_TO_CLIENT,
-                                                  oa->o_gid);
-                       nodemap_putref(nodemap);
-               }
+               oa->o_uid = mapped_uid;
+               oa->o_gid = mapped_gid;
+               oa->o_projid = mapped_projid;
        } else if (cmd == OBD_BRW_READ) {
+               /* see comment on LPROC_OFD_STATS_WRITE_BYTES usage above */
+               ofd_counter_incr(exp, LPROC_OFD_STATS_READ_BYTES, jobid, nob);
+               ofd_counter_incr(exp, LPROC_OFD_STATS_READ, jobid,
+                        ktime_us_delta(ktime_get(), kstart));
+
                rc = ofd_commitrw_read(env, ofd, fid, objcount,
                                       npages, lnb);
                if (old_rc)