Whamcloud - gitweb
LU-4343 tests: mkdir failing in sanity-hsm test 228
[fs/lustre-release.git] / lustre / target / tgt_lastrcvd.c
index 03014a1..b5d2831 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2012, Whamcloud, Inc.
+ * Copyright (c) 2011, 2012, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -61,107 +61,114 @@ static inline struct lu_buf *tti_buf_lcd(struct tgt_thread_info *tti)
 /**
  * Allocate in-memory data for client slot related to export.
  */
-int lut_client_alloc(struct obd_export *exp)
+int tgt_client_alloc(struct obd_export *exp)
 {
-        LASSERT(exp != exp->exp_obd->obd_self_export);
-
-        OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
-        if (exp->exp_target_data.ted_lcd == NULL)
-                RETURN(-ENOMEM);
-        /* Mark that slot is not yet valid, 0 doesn't work here */
-        exp->exp_target_data.ted_lr_idx = -1;
-        RETURN(0);
+       ENTRY;
+       LASSERT(exp != exp->exp_obd->obd_self_export);
+
+       OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
+       if (exp->exp_target_data.ted_lcd == NULL)
+               RETURN(-ENOMEM);
+       /* Mark that slot is not yet valid, 0 doesn't work here */
+       exp->exp_target_data.ted_lr_idx = -1;
+       RETURN(0);
 }
-EXPORT_SYMBOL(lut_client_alloc);
+EXPORT_SYMBOL(tgt_client_alloc);
 
 /**
  * Free in-memory data for client slot related to export.
  */
-void lut_client_free(struct obd_export *exp)
+void tgt_client_free(struct obd_export *exp)
 {
-        struct tg_export_data *ted = &exp->exp_target_data;
-        struct lu_target *lut = class_exp2tgt(exp);
+       struct tg_export_data   *ted = &exp->exp_target_data;
+       struct lu_target        *lut = class_exp2tgt(exp);
 
-        LASSERT(exp != exp->exp_obd->obd_self_export);
+       LASSERT(exp != exp->exp_obd->obd_self_export);
 
-        OBD_FREE_PTR(ted->ted_lcd);
-        ted->ted_lcd = NULL;
+       OBD_FREE_PTR(ted->ted_lcd);
+       ted->ted_lcd = NULL;
 
-        /* Slot may be not yet assigned */
-        if (ted->ted_lr_idx < 0)
-                return;
-        /* Clear bit when lcd is freed */
+       /* Slot may be not yet assigned */
+       if (ted->ted_lr_idx < 0)
+               return;
+       /* Clear bit when lcd is freed */
        LASSERT(lut->lut_client_bitmap);
-        if (!cfs_test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
-                CERROR("%s: client %u bit already clear in bitmap\n",
-                       exp->exp_obd->obd_name, ted->ted_lr_idx);
-                LBUG();
-        }
+       if (!test_and_clear_bit(ted->ted_lr_idx, lut->lut_client_bitmap)) {
+               CERROR("%s: client %u bit already clear in bitmap\n",
+                      exp->exp_obd->obd_name, ted->ted_lr_idx);
+               LBUG();
+       }
 }
-EXPORT_SYMBOL(lut_client_free);
+EXPORT_SYMBOL(tgt_client_free);
 
-int lut_client_data_read(const struct lu_env *env, struct lu_target *tg,
+int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
                         struct lsd_client_data *lcd, loff_t *off, int index)
 {
-       struct tgt_thread_info *tti = tgt_th_info(env);
-       int rc;
+       struct tgt_thread_info  *tti = tgt_th_info(env);
+       int                      rc;
 
        tti_buf_lcd(tti);
-       rc = dt_record_read(env, tg->lut_last_rcvd, &tti->tti_buf, off);
+       rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf, off);
        if (rc == 0) {
-               check_lcd(tg->lut_obd->obd_name, index, &tti->tti_lcd);
+               check_lcd(tgt->lut_obd->obd_name, index, &tti->tti_lcd);
                lcd_le_to_cpu(&tti->tti_lcd, lcd);
+               lcd->lcd_last_result = ptlrpc_status_ntoh(lcd->lcd_last_result);
+               lcd->lcd_last_close_result =
+                       ptlrpc_status_ntoh(lcd->lcd_last_close_result);
        }
 
        CDEBUG(D_INFO, "%s: read lcd @%lld uuid = %s, last_transno = "LPU64
               ", last_xid = "LPU64", last_result = %u, last_data = %u, "
               "last_close_transno = "LPU64", last_close_xid = "LPU64", "
-              "last_close_result = %u, rc = %d\n", tg->lut_obd->obd_name,
+              "last_close_result = %u, rc = %d\n", tgt->lut_obd->obd_name,
               *off, lcd->lcd_uuid, lcd->lcd_last_transno, lcd->lcd_last_xid,
               lcd->lcd_last_result, lcd->lcd_last_data,
               lcd->lcd_last_close_transno, lcd->lcd_last_close_xid,
               lcd->lcd_last_close_result, rc);
        return rc;
 }
-EXPORT_SYMBOL(lut_client_data_read);
+EXPORT_SYMBOL(tgt_client_data_read);
 
-int lut_client_data_write(const struct lu_env *env, struct lu_target *tg,
+int tgt_client_data_write(const struct lu_env *env, struct lu_target *tgt,
                          struct lsd_client_data *lcd, loff_t *off,
                          struct thandle *th)
 {
        struct tgt_thread_info *tti = tgt_th_info(env);
 
+       lcd->lcd_last_result = ptlrpc_status_hton(lcd->lcd_last_result);
+       lcd->lcd_last_close_result =
+               ptlrpc_status_hton(lcd->lcd_last_close_result);
        lcd_cpu_to_le(lcd, &tti->tti_lcd);
        tti_buf_lcd(tti);
 
-       return dt_record_write(env, tg->lut_last_rcvd, &tti->tti_buf, off, th);
+       return dt_record_write(env, tgt->lut_last_rcvd, &tti->tti_buf, off, th);
 }
-EXPORT_SYMBOL(lut_client_data_write);
+EXPORT_SYMBOL(tgt_client_data_write);
 
 /**
  * Update client data in last_rcvd
  */
-int lut_client_data_update(const struct lu_env *env, struct obd_export *exp)
+int tgt_client_data_update(const struct lu_env *env, struct obd_export *exp)
 {
-       struct tg_export_data *ted = &exp->exp_target_data;
-       struct lu_target      *tg = class_exp2tgt(exp);
-       struct tgt_thread_info *tti = tgt_th_info(env);
-       struct thandle        *th;
-       int                    rc = 0;
+       struct tg_export_data   *ted = &exp->exp_target_data;
+       struct lu_target        *tgt = class_exp2tgt(exp);
+       struct tgt_thread_info  *tti = tgt_th_info(env);
+       struct thandle          *th;
+       int                      rc = 0;
 
        ENTRY;
 
-       th = dt_trans_create(env, tg->lut_bottom);
+       th = dt_trans_create(env, tgt->lut_bottom);
        if (IS_ERR(th))
                RETURN(PTR_ERR(th));
 
-       rc = dt_declare_record_write(env, tg->lut_last_rcvd,
+       rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
                                     sizeof(struct lsd_client_data),
                                     ted->ted_lr_off, th);
        if (rc)
                GOTO(out, rc);
 
-       rc = dt_trans_start_local(env, tg->lut_bottom, th);
+       rc = dt_trans_start_local(env, tgt->lut_bottom, th);
        if (rc)
                GOTO(out, rc);
        /*
@@ -170,121 +177,123 @@ int lut_client_data_update(const struct lu_env *env, struct obd_export *exp)
         * transaction so that many connecting clients will not bring
         * server down with lots of sync writes.
         */
-       rc = lut_new_client_cb_add(th, exp);
+       rc = tgt_new_client_cb_add(th, exp);
        if (rc) {
                /* can't add callback, do sync now */
                th->th_sync = 1;
        } else {
-               cfs_spin_lock(&exp->exp_lock);
+               spin_lock(&exp->exp_lock);
                exp->exp_need_sync = 1;
-               cfs_spin_unlock(&exp->exp_lock);
+               spin_unlock(&exp->exp_lock);
        }
 
        tti->tti_off = ted->ted_lr_off;
-       rc = lut_client_data_write(env, tg, ted->ted_lcd, &tti->tti_off, th);
+       rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
        EXIT;
 out:
-       dt_trans_stop(env, tg->lut_bottom, th);
+       dt_trans_stop(env, tgt->lut_bottom, th);
        CDEBUG(D_INFO, "%s: update last_rcvd client data for UUID = %s, "
-              "last_transno = "LPU64": rc = %d\n", tg->lut_obd->obd_name,
-              tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno, rc);
+              "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
+              tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
 
        return rc;
 }
 
-int lut_server_data_read(const struct lu_env *env, struct lu_target *tg)
+int tgt_server_data_read(const struct lu_env *env, struct lu_target *tgt)
 {
-       struct tgt_thread_info *tti = tgt_th_info(env);
-       int rc;
+       struct tgt_thread_info  *tti = tgt_th_info(env);
+       int                      rc;
 
        tti->tti_off = 0;
        tti_buf_lsd(tti);
-       rc = dt_record_read(env, tg->lut_last_rcvd, &tti->tti_buf, &tti->tti_off);
+       rc = dt_record_read(env, tgt->lut_last_rcvd, &tti->tti_buf,
+                           &tti->tti_off);
        if (rc == 0)
-               lsd_le_to_cpu(&tti->tti_lsd, &tg->lut_lsd);
+               lsd_le_to_cpu(&tti->tti_lsd, &tgt->lut_lsd);
 
        CDEBUG(D_INFO, "%s: read last_rcvd server data for UUID = %s, "
-              "last_transno = "LPU64": rc = %d\n", tg->lut_obd->obd_name,
-              tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno, rc);
+              "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
+              tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
         return rc;
 }
-EXPORT_SYMBOL(lut_server_data_read);
+EXPORT_SYMBOL(tgt_server_data_read);
 
-int lut_server_data_write(const struct lu_env *env, struct lu_target *tg,
+int tgt_server_data_write(const struct lu_env *env, struct lu_target *tgt,
                          struct thandle *th)
 {
-       struct tgt_thread_info *tti = tgt_th_info(env);
-       int rc;
+       struct tgt_thread_info  *tti = tgt_th_info(env);
+       int                      rc;
+
        ENTRY;
 
        tti->tti_off = 0;
        tti_buf_lsd(tti);
-       lsd_cpu_to_le(&tg->lut_lsd, &tti->tti_lsd);
+       lsd_cpu_to_le(&tgt->lut_lsd, &tti->tti_lsd);
 
-       rc = dt_record_write(env, tg->lut_last_rcvd, &tti->tti_buf,
+       rc = dt_record_write(env, tgt->lut_last_rcvd, &tti->tti_buf,
                             &tti->tti_off, th);
 
        CDEBUG(D_INFO, "%s: write last_rcvd server data for UUID = %s, "
-              "last_transno = "LPU64": rc = %d\n", tg->lut_obd->obd_name,
-              tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno, rc);
+              "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
+              tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
 
        RETURN(rc);
 }
-EXPORT_SYMBOL(lut_server_data_write);
+EXPORT_SYMBOL(tgt_server_data_write);
 
 /**
  * Update server data in last_rcvd
  */
-int lut_server_data_update(const struct lu_env *env, struct lu_target *tg,
+int tgt_server_data_update(const struct lu_env *env, struct lu_target *tgt,
                           int sync)
 {
-       struct tgt_thread_info *tti = tgt_th_info(env);
-       struct thandle  *th;
-       int                 rc = 0;
+       struct tgt_thread_info  *tti = tgt_th_info(env);
+       struct thandle          *th;
+       int                      rc = 0;
 
        ENTRY;
 
        CDEBUG(D_SUPER,
               "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
-              tg->lut_lsd.lsd_uuid, tg->lut_obd->u.obt.obt_mount_count,
-              tg->lut_last_transno);
+              tgt->lut_lsd.lsd_uuid, tgt->lut_obd->u.obt.obt_mount_count,
+              tgt->lut_last_transno);
 
        /* Always save latest transno to keep it fresh */
-       cfs_spin_lock(&tg->lut_translock);
-       tg->lut_lsd.lsd_last_transno = tg->lut_last_transno;
-       cfs_spin_unlock(&tg->lut_translock);
+       spin_lock(&tgt->lut_translock);
+       tgt->lut_lsd.lsd_last_transno = tgt->lut_last_transno;
+       spin_unlock(&tgt->lut_translock);
 
-       th = dt_trans_create(env, tg->lut_bottom);
+       th = dt_trans_create(env, tgt->lut_bottom);
        if (IS_ERR(th))
                RETURN(PTR_ERR(th));
 
        th->th_sync = sync;
 
-       rc = dt_declare_record_write(env, tg->lut_last_rcvd,
+       rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
                                     sizeof(struct lr_server_data),
                                     tti->tti_off, th);
        if (rc)
                GOTO(out, rc);
 
-       rc = dt_trans_start(env, tg->lut_bottom, th);
+       rc = dt_trans_start(env, tgt->lut_bottom, th);
        if (rc)
                GOTO(out, rc);
 
-       rc = lut_server_data_write(env, tg, th);
+       rc = tgt_server_data_write(env, tgt, th);
 out:
-       dt_trans_stop(env, tg->lut_bottom, th);
+       dt_trans_stop(env, tgt->lut_bottom, th);
 
        CDEBUG(D_INFO, "%s: update last_rcvd server data for UUID = %s, "
-              "last_transno = "LPU64": rc = %d\n", tg->lut_obd->obd_name,
-              tg->lut_lsd.lsd_uuid, tg->lut_lsd.lsd_last_transno, rc);
+              "last_transno = "LPU64": rc = %d\n", tgt->lut_obd->obd_name,
+              tgt->lut_lsd.lsd_uuid, tgt->lut_lsd.lsd_last_transno, rc);
        RETURN(rc);
 }
-EXPORT_SYMBOL(lut_server_data_update);
+EXPORT_SYMBOL(tgt_server_data_update);
 
-int lut_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tg,
+int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tgt,
                           loff_t size)
 {
-       struct dt_object *dt = tg->lut_last_rcvd;
+       struct dt_object *dt = tgt->lut_last_rcvd;
        struct thandle   *th;
        struct lu_attr    attr;
        int               rc;
@@ -294,7 +303,7 @@ int lut_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tg,
        attr.la_size = size;
        attr.la_valid = LA_SIZE;
 
-       th = dt_trans_create(env, tg->lut_bottom);
+       th = dt_trans_create(env, tgt->lut_bottom);
        if (IS_ERR(th))
                RETURN(PTR_ERR(th));
        rc = dt_declare_punch(env, dt, size, OBD_OBJECT_EOF, th);
@@ -303,7 +312,7 @@ int lut_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tg,
        rc = dt_declare_attr_set(env, dt, &attr, th);
        if (rc)
                GOTO(cleanup, rc);
-       rc = dt_trans_start_local(env, tg->lut_bottom, th);
+       rc = dt_trans_start_local(env, tgt->lut_bottom, th);
        if (rc)
                GOTO(cleanup, rc);
 
@@ -312,137 +321,138 @@ int lut_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tg,
                rc = dt_attr_set(env, dt, &attr, th, BYPASS_CAPA);
 
 cleanup:
-       dt_trans_stop(env, tg->lut_bottom, th);
+       dt_trans_stop(env, tgt->lut_bottom, th);
 
        RETURN(rc);
 }
-EXPORT_SYMBOL(lut_truncate_last_rcvd);
+EXPORT_SYMBOL(tgt_truncate_last_rcvd);
 
-void lut_client_epoch_update(const struct lu_env *env, struct obd_export *exp)
+void tgt_client_epoch_update(const struct lu_env *env, struct obd_export *exp)
 {
-        struct lsd_client_data *lcd = exp->exp_target_data.ted_lcd;
-        struct lu_target *lut = class_exp2tgt(exp);
-
-        LASSERT(lut->lut_bottom);
-        /** VBR: set client last_epoch to current epoch */
-        if (lcd->lcd_last_epoch >= lut->lut_lsd.lsd_start_epoch)
-                return;
-        lcd->lcd_last_epoch = lut->lut_lsd.lsd_start_epoch;
-        lut_client_data_update(env, exp);
+       struct lsd_client_data  *lcd = exp->exp_target_data.ted_lcd;
+       struct lu_target        *tgt = class_exp2tgt(exp);
+
+       LASSERT(tgt->lut_bottom);
+       /** VBR: set client last_epoch to current epoch */
+       if (lcd->lcd_last_epoch >= tgt->lut_lsd.lsd_start_epoch)
+               return;
+       lcd->lcd_last_epoch = tgt->lut_lsd.lsd_start_epoch;
+       tgt_client_data_update(env, exp);
 }
 
 /**
  * Update boot epoch when recovery ends
  */
-void lut_boot_epoch_update(struct lu_target *lut)
+void tgt_boot_epoch_update(struct lu_target *tgt)
 {
-        struct lu_env env;
-        struct ptlrpc_request *req;
-        __u32 start_epoch;
-        cfs_list_t client_list;
-        int rc;
+       struct lu_env            env;
+       struct ptlrpc_request   *req;
+       __u32                    start_epoch;
+       cfs_list_t               client_list;
+       int                      rc;
 
-        if (lut->lut_obd->obd_stopping)
-                return;
+       if (tgt->lut_obd->obd_stopping)
+               return;
 
        rc = lu_env_init(&env, LCT_DT_THREAD);
-        if (rc) {
-                CERROR("Can't initialize environment rc=%d\n", rc);
-                return;
-        }
-
-        cfs_spin_lock(&lut->lut_translock);
-        start_epoch = lr_epoch(lut->lut_last_transno) + 1;
-        lut->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
-        lut->lut_lsd.lsd_start_epoch = start_epoch;
-        cfs_spin_unlock(&lut->lut_translock);
-
-        CFS_INIT_LIST_HEAD(&client_list);
-        /**
-         * The recovery is not yet finished and final queue can still be updated
-         * with resend requests. Move final list to separate one for processing
-         */
-        cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock);
-        cfs_list_splice_init(&lut->lut_obd->obd_final_req_queue, &client_list);
-        cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock);
-
-        /**
-         * go through list of exports participated in recovery and
-         * set new epoch for them
-         */
-        cfs_list_for_each_entry(req, &client_list, rq_list) {
-                LASSERT(!req->rq_export->exp_delayed);
-                if (!req->rq_export->exp_vbr_failed)
-                        lut_client_epoch_update(&env, req->rq_export);
-        }
-        /** return list back at once */
-        cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock);
-        cfs_list_splice_init(&client_list, &lut->lut_obd->obd_final_req_queue);
-        cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock);
-        /** update server epoch */
-        lut_server_data_update(&env, lut, 1);
-        lu_env_fini(&env);
+       if (rc) {
+               CERROR("%s: can't initialize environment: rc = %d\n",
+                       tgt->lut_obd->obd_name, rc);
+               return;
+       }
+
+       spin_lock(&tgt->lut_translock);
+       start_epoch = lr_epoch(tgt->lut_last_transno) + 1;
+       tgt->lut_last_transno = (__u64)start_epoch << LR_EPOCH_BITS;
+       tgt->lut_lsd.lsd_start_epoch = start_epoch;
+       spin_unlock(&tgt->lut_translock);
+
+       CFS_INIT_LIST_HEAD(&client_list);
+       /**
+        * The recovery is not yet finished and final queue can still be updated
+        * with resend requests. Move final list to separate one for processing
+        */
+       spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
+       cfs_list_splice_init(&tgt->lut_obd->obd_final_req_queue, &client_list);
+       spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
+
+       /**
+        * go through list of exports participated in recovery and
+        * set new epoch for them
+        */
+       cfs_list_for_each_entry(req, &client_list, rq_list) {
+               LASSERT(!req->rq_export->exp_delayed);
+               if (!req->rq_export->exp_vbr_failed)
+                       tgt_client_epoch_update(&env, req->rq_export);
+       }
+       /** return list back at once */
+       spin_lock(&tgt->lut_obd->obd_recovery_task_lock);
+       cfs_list_splice_init(&client_list, &tgt->lut_obd->obd_final_req_queue);
+       spin_unlock(&tgt->lut_obd->obd_recovery_task_lock);
+       /** update server epoch */
+       tgt_server_data_update(&env, tgt, 1);
+       lu_env_fini(&env);
 }
-EXPORT_SYMBOL(lut_boot_epoch_update);
+EXPORT_SYMBOL(tgt_boot_epoch_update);
 
 /**
  * commit callback, need to update last_commited value
  */
-struct lut_last_committed_callback {
-        struct dt_txn_commit_cb llcc_cb;
-        struct lu_target       *llcc_lut;
-        struct obd_export      *llcc_exp;
-        __u64                   llcc_transno;
+struct tgt_last_committed_callback {
+       struct dt_txn_commit_cb  llcc_cb;
+       struct lu_target        *llcc_tgt;
+       struct obd_export       *llcc_exp;
+       __u64                    llcc_transno;
 };
 
-void lut_cb_last_committed(struct lu_env *env, struct thandle *th,
-                           struct dt_txn_commit_cb *cb, int err)
+void tgt_cb_last_committed(struct lu_env *env, struct thandle *th,
+                          struct dt_txn_commit_cb *cb, int err)
 {
-        struct lut_last_committed_callback *ccb;
-
-        ccb = container_of0(cb, struct lut_last_committed_callback, llcc_cb);
-
-       LASSERT(ccb->llcc_lut != NULL);
-       LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_lut->lut_obd);
-
-        cfs_spin_lock(&ccb->llcc_lut->lut_translock);
-        if (ccb->llcc_transno > ccb->llcc_lut->lut_obd->obd_last_committed)
-                ccb->llcc_lut->lut_obd->obd_last_committed = ccb->llcc_transno;
-
-        LASSERT(ccb->llcc_exp);
-        if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
-                ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
-                cfs_spin_unlock(&ccb->llcc_lut->lut_translock);
-                ptlrpc_commit_replies(ccb->llcc_exp);
-        } else {
-                cfs_spin_unlock(&ccb->llcc_lut->lut_translock);
-        }
-        class_export_cb_put(ccb->llcc_exp);
-        if (ccb->llcc_transno)
-                CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
-                       ccb->llcc_lut->lut_obd->obd_name, ccb->llcc_transno);
-        OBD_FREE_PTR(ccb);
+       struct tgt_last_committed_callback *ccb;
+
+       ccb = container_of0(cb, struct tgt_last_committed_callback, llcc_cb);
+
+       LASSERT(ccb->llcc_tgt != NULL);
+       LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_tgt->lut_obd);
+
+       spin_lock(&ccb->llcc_tgt->lut_translock);
+       if (ccb->llcc_transno > ccb->llcc_tgt->lut_obd->obd_last_committed)
+               ccb->llcc_tgt->lut_obd->obd_last_committed = ccb->llcc_transno;
+
+       LASSERT(ccb->llcc_exp);
+       if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
+               ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
+               spin_unlock(&ccb->llcc_tgt->lut_translock);
+               ptlrpc_commit_replies(ccb->llcc_exp);
+       } else {
+               spin_unlock(&ccb->llcc_tgt->lut_translock);
+       }
+       class_export_cb_put(ccb->llcc_exp);
+       if (ccb->llcc_transno)
+               CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
+                      ccb->llcc_tgt->lut_obd->obd_name, ccb->llcc_transno);
+       OBD_FREE_PTR(ccb);
 }
 
-int lut_last_commit_cb_add(struct thandle *th, struct lu_target *lut,
-                           struct obd_export *exp, __u64 transno)
+int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
+                          struct obd_export *exp, __u64 transno)
 {
-       struct lut_last_committed_callback *ccb;
-       struct dt_txn_commit_cb            *dcb;
-       int                                rc;
+       struct tgt_last_committed_callback      *ccb;
+       struct dt_txn_commit_cb                 *dcb;
+       int                                      rc;
 
        OBD_ALLOC_PTR(ccb);
        if (ccb == NULL)
                return -ENOMEM;
 
-       ccb->llcc_lut     = lut;
-       ccb->llcc_exp     = class_export_cb_get(exp);
+       ccb->llcc_tgt = tgt;
+       ccb->llcc_exp = class_export_cb_get(exp);
        ccb->llcc_transno = transno;
 
-       dcb            = &ccb->llcc_cb;
-       dcb->dcb_func  = lut_cb_last_committed;
+       dcb = &ccb->llcc_cb;
+       dcb->dcb_func = tgt_cb_last_committed;
        CFS_INIT_LIST_HEAD(&dcb->dcb_linkage);
-       strncpy(dcb->dcb_name, "lut_cb_last_committed", MAX_COMMIT_CB_STR_LEN);
+       strncpy(dcb->dcb_name, "tgt_cb_last_committed", MAX_COMMIT_CB_STR_LEN);
        dcb->dcb_name[MAX_COMMIT_CB_STR_LEN - 1] = '\0';
 
        rc = dt_trans_cb_add(th, dcb);
@@ -451,56 +461,56 @@ int lut_last_commit_cb_add(struct thandle *th, struct lu_target *lut,
                OBD_FREE_PTR(ccb);
        }
 
-       if ((exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) != 0)
+       if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
                /* report failure to force synchronous operation */
                return -EPERM;
 
        return rc;
 }
-EXPORT_SYMBOL(lut_last_commit_cb_add);
+EXPORT_SYMBOL(tgt_last_commit_cb_add);
 
-struct lut_new_client_callback {
-        struct dt_txn_commit_cb lncc_cb;
-        struct obd_export      *lncc_exp;
+struct tgt_new_client_callback {
+       struct dt_txn_commit_cb  lncc_cb;
+       struct obd_export       *lncc_exp;
 };
 
-void lut_cb_new_client(struct lu_env *env, struct thandle *th,
-                       struct dt_txn_commit_cb *cb, int err)
+void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
+                      struct dt_txn_commit_cb *cb, int err)
 {
-        struct lut_new_client_callback *ccb;
+       struct tgt_new_client_callback *ccb;
 
-        ccb = container_of0(cb, struct lut_new_client_callback, lncc_cb);
+       ccb = container_of0(cb, struct tgt_new_client_callback, lncc_cb);
 
-        LASSERT(ccb->lncc_exp->exp_obd);
+       LASSERT(ccb->lncc_exp->exp_obd);
 
-        CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
-               ccb->lncc_exp->exp_obd->obd_name,
-               ccb->lncc_exp->exp_client_uuid.uuid);
+       CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
+              ccb->lncc_exp->exp_obd->obd_name,
+              ccb->lncc_exp->exp_client_uuid.uuid);
 
-        cfs_spin_lock(&ccb->lncc_exp->exp_lock);
-        ccb->lncc_exp->exp_need_sync = 0;
-        cfs_spin_unlock(&ccb->lncc_exp->exp_lock);
-        class_export_cb_put(ccb->lncc_exp);
+       spin_lock(&ccb->lncc_exp->exp_lock);
+       ccb->lncc_exp->exp_need_sync = 0;
+       spin_unlock(&ccb->lncc_exp->exp_lock);
+       class_export_cb_put(ccb->lncc_exp);
 
-        OBD_FREE_PTR(ccb);
+       OBD_FREE_PTR(ccb);
 }
 
-int lut_new_client_cb_add(struct thandle *th, struct obd_export *exp)
+int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp)
 {
-       struct lut_new_client_callback *ccb;
-       struct dt_txn_commit_cb        *dcb;
-       int                            rc;
+       struct tgt_new_client_callback  *ccb;
+       struct dt_txn_commit_cb         *dcb;
+       int                              rc;
 
        OBD_ALLOC_PTR(ccb);
        if (ccb == NULL)
                return -ENOMEM;
 
-       ccb->lncc_exp  = class_export_cb_get(exp);
+       ccb->lncc_exp = class_export_cb_get(exp);
 
-       dcb            = &ccb->lncc_cb;
-       dcb->dcb_func  = lut_cb_new_client;
+       dcb = &ccb->lncc_cb;
+       dcb->dcb_func = tgt_cb_new_client;
        CFS_INIT_LIST_HEAD(&dcb->dcb_linkage);
-       strncpy(dcb->dcb_name, "lut_cb_new_client", MAX_COMMIT_CB_STR_LEN);
+       strncpy(dcb->dcb_name, "tgt_cb_new_client", MAX_COMMIT_CB_STR_LEN);
        dcb->dcb_name[MAX_COMMIT_CB_STR_LEN - 1] = '\0';
 
        rc = dt_trans_cb_add(th, dcb);
@@ -517,64 +527,64 @@ int lut_new_client_cb_add(struct thandle *th, struct obd_export *exp)
  * We use a bitmap to locate a free space in the last_rcvd file and initialize
  * tg_export_data.
  */
-int lut_client_new(const struct lu_env *env, struct obd_export *exp)
+int tgt_client_new(const struct lu_env *env, struct obd_export *exp)
 {
-       struct tg_export_data *ted = &exp->exp_target_data;
-       struct lu_target *tg = class_exp2tgt(exp);
-       int rc = 0, idx;
+       struct tg_export_data   *ted = &exp->exp_target_data;
+       struct lu_target        *tgt = class_exp2tgt(exp);
+       int                      rc = 0, idx;
 
        ENTRY;
 
-       LASSERT(tg->lut_client_bitmap != NULL);
-       if (!strcmp(ted->ted_lcd->lcd_uuid, tg->lut_obd->obd_uuid.uuid))
+       LASSERT(tgt->lut_client_bitmap != NULL);
+       if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid))
                RETURN(0);
 
-       cfs_mutex_init(&ted->ted_lcd_lock);
+       mutex_init(&ted->ted_lcd_lock);
 
-       if ((exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) != 0)
+       if (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
                RETURN(0);
 
        /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so
         * there's no need for extra complication here
         */
-       idx = cfs_find_first_zero_bit(tg->lut_client_bitmap, LR_MAX_CLIENTS);
+       idx = find_first_zero_bit(tgt->lut_client_bitmap, LR_MAX_CLIENTS);
 repeat:
        if (idx >= LR_MAX_CLIENTS ||
            OBD_FAIL_CHECK(OBD_FAIL_MDS_CLIENT_ADD)) {
                CERROR("%s: no room for %u clients - fix LR_MAX_CLIENTS\n",
-                      tg->lut_obd->obd_name,  idx);
+                      tgt->lut_obd->obd_name,  idx);
                RETURN(-EOVERFLOW);
        }
-       if (cfs_test_and_set_bit(idx, tg->lut_client_bitmap)) {
-               idx = cfs_find_next_zero_bit(tg->lut_client_bitmap,
+       if (test_and_set_bit(idx, tgt->lut_client_bitmap)) {
+               idx = find_next_zero_bit(tgt->lut_client_bitmap,
                                             LR_MAX_CLIENTS, idx);
                goto repeat;
        }
 
        CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
-              tg->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
+              tgt->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
 
        ted->ted_lr_idx = idx;
-       ted->ted_lr_off = tg->lut_lsd.lsd_client_start +
-                         idx * tg->lut_lsd.lsd_client_size;
+       ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
+                         idx * tgt->lut_lsd.lsd_client_size;
 
        LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
 
        CDEBUG(D_INFO, "%s: new client at index %d (%llu) with UUID '%s'\n",
-              tg->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
+              tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
               ted->ted_lcd->lcd_uuid);
 
        if (OBD_FAIL_CHECK(OBD_FAIL_TGT_CLIENT_ADD))
                RETURN(-ENOSPC);
 
-       rc = lut_client_data_update(env, exp);
+       rc = tgt_client_data_update(env, exp);
        if (rc)
                CERROR("%s: Failed to write client lcd at idx %d, rc %d\n",
-                      tg->lut_obd->obd_name, idx, rc);
+                      tgt->lut_obd->obd_name, idx, rc);
 
        RETURN(rc);
 }
-EXPORT_SYMBOL(lut_client_new);
+EXPORT_SYMBOL(tgt_client_new);
 
 /* Add client data to the MDS.  We use a bitmap to locate a free space
  * in the last_rcvd file if cl_off is -1 (i.e. a new client).
@@ -584,46 +594,46 @@ EXPORT_SYMBOL(lut_client_new);
  * It should not be possible to fail adding an existing client - otherwise
  * mdt_init_server_data() callsite needs to be fixed.
  */
-int lut_client_add(const struct lu_env *env,  struct obd_export *exp, int idx)
+int tgt_client_add(const struct lu_env *env,  struct obd_export *exp, int idx)
 {
-       struct tg_export_data *ted = &exp->exp_target_data;
-       struct lu_target *tg = class_exp2tgt(exp);
+       struct tg_export_data   *ted = &exp->exp_target_data;
+       struct lu_target        *tgt = class_exp2tgt(exp);
 
        ENTRY;
 
-       LASSERT(tg->lut_client_bitmap != NULL);
+       LASSERT(tgt->lut_client_bitmap != NULL);
        LASSERTF(idx >= 0, "%d\n", idx);
 
-       if (!strcmp(ted->ted_lcd->lcd_uuid, tg->lut_obd->obd_uuid.uuid) ||
-           (exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) != 0)
+       if (!strcmp(ted->ted_lcd->lcd_uuid, tgt->lut_obd->obd_uuid.uuid) ||
+           exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
                RETURN(0);
 
-       if (cfs_test_and_set_bit(idx, tg->lut_client_bitmap)) {
+       if (test_and_set_bit(idx, tgt->lut_client_bitmap)) {
                CERROR("%s: client %d: bit already set in bitmap!!\n",
-                      tg->lut_obd->obd_name,  idx);
+                      tgt->lut_obd->obd_name,  idx);
                LBUG();
        }
 
        CDEBUG(D_INFO, "%s: client at idx %d with UUID '%s' added\n",
-              tg->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
+              tgt->lut_obd->obd_name, idx, ted->ted_lcd->lcd_uuid);
 
        ted->ted_lr_idx = idx;
-       ted->ted_lr_off = tg->lut_lsd.lsd_client_start +
-                         idx * tg->lut_lsd.lsd_client_size;
+       ted->ted_lr_off = tgt->lut_lsd.lsd_client_start +
+                         idx * tgt->lut_lsd.lsd_client_size;
 
-       cfs_mutex_init(&ted->ted_lcd_lock);
+       mutex_init(&ted->ted_lcd_lock);
 
        LASSERTF(ted->ted_lr_off > 0, "ted_lr_off = %llu\n", ted->ted_lr_off);
 
        RETURN(0);
 }
-EXPORT_SYMBOL(lut_client_add);
+EXPORT_SYMBOL(tgt_client_add);
 
-int lut_client_del(const struct lu_env *env, struct obd_export *exp)
+int tgt_client_del(const struct lu_env *env, struct obd_export *exp)
 {
-       struct tg_export_data *ted = &exp->exp_target_data;
-       struct lu_target      *tg = class_exp2tgt(exp);
-       int                    rc;
+       struct tg_export_data   *ted = &exp->exp_target_data;
+       struct lu_target        *tgt = class_exp2tgt(exp);
+       int                      rc;
 
        ENTRY;
 
@@ -631,19 +641,19 @@ int lut_client_del(const struct lu_env *env, struct obd_export *exp)
 
        /* XXX if lcd_uuid were a real obd_uuid, I could use obd_uuid_equals */
        if (!strcmp((char *)ted->ted_lcd->lcd_uuid,
-                   (char *)tg->lut_obd->obd_uuid.uuid) ||
-           (exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) != 0)
+                   (char *)tgt->lut_obd->obd_uuid.uuid) ||
+           exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT)
                RETURN(0);
 
        CDEBUG(D_INFO, "%s: del client at idx %u, off %lld, UUID '%s'\n",
-              tg->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
+              tgt->lut_obd->obd_name, ted->ted_lr_idx, ted->ted_lr_off,
               ted->ted_lcd->lcd_uuid);
 
        /* Clear the bit _after_ zeroing out the client so we don't
           race with filter_client_add and zero out new clients.*/
-       if (!cfs_test_bit(ted->ted_lr_idx, tg->lut_client_bitmap)) {
+       if (!test_bit(ted->ted_lr_idx, tgt->lut_client_bitmap)) {
                CERROR("%s: client %u: bit already clear in bitmap!!\n",
-                      tg->lut_obd->obd_name, ted->ted_lr_idx);
+                      tgt->lut_obd->obd_name, ted->ted_lr_idx);
                LBUG();
        }
 
@@ -654,23 +664,580 @@ int lut_client_del(const struct lu_env *env, struct obd_export *exp)
        /* Make sure the server's last_transno is up to date.
         * This should be done before zeroing client slot so last_transno will
         * be in server data or in client data in case of failure */
-       rc = lut_server_data_update(env, tg, 0);
+       rc = tgt_server_data_update(env, tgt, 0);
        if (rc != 0) {
                CERROR("%s: failed to update server data, skip client %s "
-                      "zeroing, rc %d\n", tg->lut_obd->obd_name,
+                      "zeroing, rc %d\n", tgt->lut_obd->obd_name,
                       ted->ted_lcd->lcd_uuid, rc);
                RETURN(rc);
        }
 
-       cfs_mutex_lock(&ted->ted_lcd_lock);
+       mutex_lock(&ted->ted_lcd_lock);
        memset(ted->ted_lcd->lcd_uuid, 0, sizeof ted->ted_lcd->lcd_uuid);
-       rc = lut_client_data_update(env, exp);
-       cfs_mutex_unlock(&ted->ted_lcd_lock);
+       rc = tgt_client_data_update(env, exp);
+       mutex_unlock(&ted->ted_lcd_lock);
 
        CDEBUG(rc == 0 ? D_INFO : D_ERROR,
               "%s: zeroing out client %s at idx %u (%llu), rc %d\n",
-              tg->lut_obd->obd_name, ted->ted_lcd->lcd_uuid,
+              tgt->lut_obd->obd_name, ted->ted_lcd->lcd_uuid,
               ted->ted_lr_idx, ted->ted_lr_off, rc);
        RETURN(rc);
 }
-EXPORT_SYMBOL(lut_client_del);
+EXPORT_SYMBOL(tgt_client_del);
+
+/*
+ * last_rcvd & last_committed update callbacks
+ */
+int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
+                        struct dt_object *obj, __u64 opdata,
+                        struct thandle *th, struct ptlrpc_request *req)
+{
+       struct tgt_thread_info  *tti = tgt_th_info(env);
+       struct tg_export_data   *ted;
+       __u64                   *transno_p;
+       int                      rc = 0;
+       bool                     lw_client, update = false;
+
+       ENTRY;
+
+       ted = &req->rq_export->exp_target_data;
+
+       lw_client = exp_connect_flags(req->rq_export) & OBD_CONNECT_LIGHTWEIGHT;
+       if (ted->ted_lr_idx < 0 && !lw_client)
+               /* ofd connect may cause transaction before export has
+                * last_rcvd slot */
+               RETURN(0);
+
+       tti->tti_transno = lustre_msg_get_transno(req->rq_reqmsg);
+
+       spin_lock(&tgt->lut_translock);
+       if (th->th_result != 0) {
+               if (tti->tti_transno != 0) {
+                       CERROR("%s: replay transno "LPU64" failed: rc = %d\n",
+                              tgt_name(tgt), tti->tti_transno, th->th_result);
+               }
+       } else if (tti->tti_transno == 0) {
+               tti->tti_transno = ++tgt->lut_last_transno;
+       } else {
+               /* should be replay */
+               if (tti->tti_transno > tgt->lut_last_transno)
+                       tgt->lut_last_transno = tti->tti_transno;
+       }
+       spin_unlock(&tgt->lut_translock);
+
+       /** VBR: set new versions */
+       if (th->th_result == 0 && obj != NULL)
+               dt_version_set(env, obj, tti->tti_transno, th);
+
+       /* filling reply data */
+       CDEBUG(D_INODE, "transno = "LPU64", last_committed = "LPU64"\n",
+              tti->tti_transno, tgt->lut_obd->obd_last_committed);
+
+       req->rq_transno = tti->tti_transno;
+       lustre_msg_set_transno(req->rq_repmsg, tti->tti_transno);
+
+       /* if can't add callback, do sync write */
+       th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, req->rq_export,
+                                               tti->tti_transno);
+
+       if (lw_client) {
+               /* All operations performed by LW clients are synchronous and
+                * we store the committed transno in the last_rcvd header */
+               spin_lock(&tgt->lut_translock);
+               if (tti->tti_transno > tgt->lut_lsd.lsd_last_transno) {
+                       tgt->lut_lsd.lsd_last_transno = tti->tti_transno;
+                       update = true;
+               }
+               spin_unlock(&tgt->lut_translock);
+               /* Although lightweight (LW) connections have no slot in
+                * last_rcvd, we still want to maintain the in-memory
+                * lsd_client_data structure in order to properly handle reply
+                * reconstruction. */
+       } else if (ted->ted_lr_off == 0) {
+               CERROR("%s: client idx %d has offset %lld\n",
+                      tgt_name(tgt), ted->ted_lr_idx, ted->ted_lr_off);
+               RETURN(-EINVAL);
+       }
+
+       /* if the export has already been disconnected, we have no last_rcvd
+        * slot, update server data with latest transno then */
+       if (ted->ted_lcd == NULL) {
+               CWARN("commit transaction for disconnected client %s: rc %d\n",
+                     req->rq_export->exp_client_uuid.uuid, rc);
+               GOTO(srv_update, rc = 0);
+       }
+
+       mutex_lock(&ted->ted_lcd_lock);
+       LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
+       if (lustre_msg_get_opc(req->rq_reqmsg) == MDS_CLOSE ||
+           lustre_msg_get_opc(req->rq_reqmsg) == MDS_DONE_WRITING) {
+               transno_p = &ted->ted_lcd->lcd_last_close_transno;
+               ted->ted_lcd->lcd_last_close_xid = req->rq_xid;
+               ted->ted_lcd->lcd_last_close_result = th->th_result;
+       } else {
+               /* VBR: save versions in last_rcvd for reconstruct. */
+               __u64 *pre_versions = lustre_msg_get_versions(req->rq_repmsg);
+
+               if (pre_versions) {
+                       ted->ted_lcd->lcd_pre_versions[0] = pre_versions[0];
+                       ted->ted_lcd->lcd_pre_versions[1] = pre_versions[1];
+                       ted->ted_lcd->lcd_pre_versions[2] = pre_versions[2];
+                       ted->ted_lcd->lcd_pre_versions[3] = pre_versions[3];
+               }
+               transno_p = &ted->ted_lcd->lcd_last_transno;
+               ted->ted_lcd->lcd_last_xid = req->rq_xid;
+               ted->ted_lcd->lcd_last_result = th->th_result;
+               /* XXX: lcd_last_data is __u32 but intent_dispostion is __u64,
+                * see struct ldlm_reply->lock_policy_res1; */
+               ted->ted_lcd->lcd_last_data = opdata;
+       }
+
+       /* Update transno in slot only if non-zero number, i.e. no errors */
+       if (likely(tti->tti_transno != 0)) {
+               if (*transno_p > tti->tti_transno &&
+                   !tgt->lut_no_reconstruct) {
+                       CERROR("%s: trying to overwrite bigger transno:"
+                              "on-disk: "LPU64", new: "LPU64" replay: %d. "
+                              "see LU-617.\n", tgt_name(tgt), *transno_p,
+                              tti->tti_transno, req_is_replay(req));
+                       if (req_is_replay(req)) {
+                               spin_lock(&req->rq_export->exp_lock);
+                               req->rq_export->exp_vbr_failed = 1;
+                               spin_unlock(&req->rq_export->exp_lock);
+                       }
+                       mutex_unlock(&ted->ted_lcd_lock);
+                       RETURN(req_is_replay(req) ? -EOVERFLOW : 0);
+               }
+               *transno_p = tti->tti_transno;
+       }
+
+       if (!lw_client) {
+               tti->tti_off = ted->ted_lr_off;
+               rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
+               if (rc < 0) {
+                       mutex_unlock(&ted->ted_lcd_lock);
+                       RETURN(rc);
+               }
+       }
+       mutex_unlock(&ted->ted_lcd_lock);
+       EXIT;
+srv_update:
+       if (update)
+               rc = tgt_server_data_write(env, tgt, th);
+       return rc;
+}
+
+/*
+ * last_rcvd update for echo client simulation.
+ * It updates last_rcvd client slot and version of object in
+ * simple way but with all locks to simulate all drawbacks
+ */
+int tgt_last_rcvd_update_echo(const struct lu_env *env, struct lu_target *tgt,
+                             struct dt_object *obj, struct thandle *th,
+                             struct obd_export *exp)
+{
+       struct tgt_thread_info  *tti = tgt_th_info(env);
+       struct tg_export_data   *ted = &exp->exp_target_data;
+       int                      rc = 0;
+
+       ENTRY;
+
+       tti->tti_transno = 0;
+
+       spin_lock(&tgt->lut_translock);
+       if (th->th_result == 0)
+               tti->tti_transno = ++tgt->lut_last_transno;
+       spin_unlock(&tgt->lut_translock);
+
+       /** VBR: set new versions */
+       if (th->th_result == 0 && obj != NULL)
+               dt_version_set(env, obj, tti->tti_transno, th);
+
+       /* if can't add callback, do sync write */
+       th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, exp,
+                                               tti->tti_transno);
+
+       LASSERT(ted->ted_lr_off > 0);
+
+       mutex_lock(&ted->ted_lcd_lock);
+       LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
+       ted->ted_lcd->lcd_last_transno = tti->tti_transno;
+       ted->ted_lcd->lcd_last_result = th->th_result;
+
+       tti->tti_off = ted->ted_lr_off;
+       rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
+       mutex_unlock(&ted->ted_lcd_lock);
+       RETURN(rc);
+}
+
+int tgt_clients_data_init(const struct lu_env *env, struct lu_target *tgt,
+                         unsigned long last_size)
+{
+       struct obd_device       *obd = tgt->lut_obd;
+       struct lr_server_data   *lsd = &tgt->lut_lsd;
+       struct lsd_client_data  *lcd = NULL;
+       struct tg_export_data   *ted;
+       int                      cl_idx;
+       int                      rc = 0;
+       loff_t                   off = lsd->lsd_client_start;
+
+       ENTRY;
+
+       CLASSERT(offsetof(struct lsd_client_data, lcd_padding) +
+                sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE);
+
+       OBD_ALLOC_PTR(lcd);
+       if (lcd == NULL)
+               RETURN(-ENOMEM);
+
+       for (cl_idx = 0; off < last_size; cl_idx++) {
+               struct obd_export       *exp;
+               __u64                    last_transno;
+
+               /* Don't assume off is incremented properly by
+                * read_record(), in case sizeof(*lcd)
+                * isn't the same as fsd->lsd_client_size.  */
+               off = lsd->lsd_client_start + cl_idx * lsd->lsd_client_size;
+               rc = tgt_client_data_read(env, tgt, lcd, &off, cl_idx);
+               if (rc) {
+                       CERROR("%s: error reading last_rcvd %s idx %d off "
+                              "%llu: rc = %d\n", tgt_name(tgt), LAST_RCVD,
+                              cl_idx, off, rc);
+                       rc = 0;
+                       break; /* read error shouldn't cause startup to fail */
+               }
+
+               if (lcd->lcd_uuid[0] == '\0') {
+                       CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
+                              cl_idx);
+                       continue;
+               }
+
+               last_transno = lcd_last_transno(lcd);
+
+               /* These exports are cleaned up by disconnect, so they
+                * need to be set up like real exports as connect does.
+                */
+               CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
+                      " srv lr: "LPU64" lx: "LPU64"\n", lcd->lcd_uuid, cl_idx,
+                      last_transno, lsd->lsd_last_transno, lcd_last_xid(lcd));
+
+               exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
+               if (IS_ERR(exp)) {
+                       if (PTR_ERR(exp) == -EALREADY) {
+                               /* export already exists, zero out this one */
+                               CERROR("%s: Duplicate export %s!\n",
+                                      tgt_name(tgt), lcd->lcd_uuid);
+                               continue;
+                       }
+                       GOTO(err_out, rc = PTR_ERR(exp));
+               }
+
+               ted = &exp->exp_target_data;
+               *ted->ted_lcd = *lcd;
+
+               rc = tgt_client_add(env, exp, cl_idx);
+               LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
+               /* VBR: set export last committed version */
+               exp->exp_last_committed = last_transno;
+               spin_lock(&exp->exp_lock);
+               exp->exp_connecting = 0;
+               exp->exp_in_recovery = 0;
+               spin_unlock(&exp->exp_lock);
+               obd->obd_max_recoverable_clients++;
+               class_export_put(exp);
+
+               /* Need to check last_rcvd even for duplicated exports. */
+               CDEBUG(D_OTHER, "client at idx %d has last_transno = "LPU64"\n",
+                      cl_idx, last_transno);
+
+               spin_lock(&tgt->lut_translock);
+               tgt->lut_last_transno = max(last_transno,
+                                           tgt->lut_last_transno);
+               spin_unlock(&tgt->lut_translock);
+       }
+
+err_out:
+       OBD_FREE_PTR(lcd);
+       RETURN(rc);
+}
+
+struct server_compat_data {
+       __u32 rocompat;
+       __u32 incompat;
+       __u32 rocinit;
+       __u32 incinit;
+};
+
+static struct server_compat_data tgt_scd[] = {
+       [LDD_F_SV_TYPE_MDT] = {
+               .rocompat = OBD_ROCOMPAT_LOVOBJID,
+               .incompat = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
+                           OBD_INCOMPAT_FID | OBD_INCOMPAT_IAM_DIR |
+                           OBD_INCOMPAT_LMM_VER | OBD_INCOMPAT_MULTI_OI,
+               .rocinit = OBD_ROCOMPAT_LOVOBJID,
+               .incinit = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
+                          OBD_INCOMPAT_MULTI_OI,
+       },
+       [LDD_F_SV_TYPE_OST] = {
+               .rocompat = 0,
+               .incompat = OBD_INCOMPAT_OST | OBD_INCOMPAT_COMMON_LR |
+                           OBD_INCOMPAT_FID,
+               .rocinit = 0,
+               .incinit = OBD_INCOMPAT_OST | OBD_INCOMPAT_COMMON_LR,
+       }
+};
+
+int tgt_server_data_init(const struct lu_env *env, struct lu_target *tgt)
+{
+       struct tgt_thread_info          *tti = tgt_th_info(env);
+       struct lr_server_data           *lsd = &tgt->lut_lsd;
+       unsigned long                    last_rcvd_size;
+       __u32                            index;
+       int                              rc, type;
+
+       rc = dt_attr_get(env, tgt->lut_last_rcvd, &tti->tti_attr, BYPASS_CAPA);
+       if (rc)
+               RETURN(rc);
+
+       last_rcvd_size = (unsigned long)tti->tti_attr.la_size;
+
+       /* ensure padding in the struct is the correct size */
+       CLASSERT(offsetof(struct lr_server_data, lsd_padding) +
+                sizeof(lsd->lsd_padding) == LR_SERVER_SIZE);
+
+       rc = server_name2index(tgt_name(tgt), &index, NULL);
+       if (rc < 0) {
+               CERROR("%s: Can not get index from name: rc = %d\n",
+                      tgt_name(tgt), rc);
+               RETURN(rc);
+       }
+       /* server_name2index() returns type */
+       type = rc;
+       if (type != LDD_F_SV_TYPE_MDT && type != LDD_F_SV_TYPE_OST) {
+               CERROR("%s: unknown target type %x\n", tgt_name(tgt), type);
+               RETURN(-EINVAL);
+       }
+
+       /* last_rcvd on OST doesn't provide reconstruct support because there
+        * may be up to 8 in-flight write requests per single slot in
+        * last_rcvd client data
+        */
+       tgt->lut_no_reconstruct = (type == LDD_F_SV_TYPE_OST);
+
+       if (last_rcvd_size == 0) {
+               LCONSOLE_WARN("%s: new disk, initializing\n", tgt_name(tgt));
+
+               memcpy(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid,
+                      sizeof(lsd->lsd_uuid));
+               lsd->lsd_last_transno = 0;
+               lsd->lsd_mount_count = 0;
+               lsd->lsd_server_size = LR_SERVER_SIZE;
+               lsd->lsd_client_start = LR_CLIENT_START;
+               lsd->lsd_client_size = LR_CLIENT_SIZE;
+               lsd->lsd_subdir_count = OBJ_SUBDIR_COUNT;
+               lsd->lsd_osd_index = index;
+               lsd->lsd_feature_rocompat = tgt_scd[type].rocinit;
+               lsd->lsd_feature_incompat = tgt_scd[type].incinit;
+       } else {
+               rc = tgt_server_data_read(env, tgt);
+               if (rc) {
+                       CERROR("%s: error reading LAST_RCVD: rc= %d\n",
+                              tgt_name(tgt), rc);
+                       RETURN(rc);
+               }
+               if (strcmp(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid)) {
+                       LCONSOLE_ERROR_MSG(0x157, "Trying to start OBD %s "
+                                          "using the wrong disk %s. Were the"
+                                          " /dev/ assignments rearranged?\n",
+                                          tgt->lut_obd->obd_uuid.uuid,
+                                          lsd->lsd_uuid);
+                       RETURN(-EINVAL);
+               }
+
+               if (lsd->lsd_osd_index != index) {
+                       LCONSOLE_ERROR_MSG(0x157, "%s: index %d in last rcvd "
+                                          "is different with the index %d in"
+                                          "config log, It might be disk"
+                                          "corruption!\n", tgt_name(tgt),
+                                          lsd->lsd_osd_index, index);
+                       RETURN(-EINVAL);
+               }
+       }
+
+       if (lsd->lsd_feature_incompat & ~tgt_scd[type].incompat) {
+               CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
+                      tgt_name(tgt),
+                      lsd->lsd_feature_incompat & ~tgt_scd[type].incompat);
+               RETURN(-EINVAL);
+       }
+       if (lsd->lsd_feature_rocompat & ~tgt_scd[type].rocompat) {
+               CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
+                      tgt_name(tgt),
+                      lsd->lsd_feature_rocompat & ~tgt_scd[type].rocompat);
+               RETURN(-EINVAL);
+       }
+       /** Interop: evict all clients at first boot with 1.8 last_rcvd */
+       if (type == LDD_F_SV_TYPE_MDT &&
+           !(lsd->lsd_feature_compat & OBD_COMPAT_20)) {
+               if (last_rcvd_size > lsd->lsd_client_start) {
+                       LCONSOLE_WARN("%s: mounting at first time on 1.8 FS, "
+                                     "remove all clients for interop needs\n",
+                                     tgt_name(tgt));
+                       rc = tgt_truncate_last_rcvd(env, tgt,
+                                                   lsd->lsd_client_start);
+                       if (rc)
+                               RETURN(rc);
+                       last_rcvd_size = lsd->lsd_client_start;
+               }
+               /** set 2.0 flag to upgrade/downgrade between 1.8 and 2.0 */
+               lsd->lsd_feature_compat |= OBD_COMPAT_20;
+       }
+
+       lsd->lsd_feature_incompat |= OBD_INCOMPAT_FID;
+
+       spin_lock(&tgt->lut_translock);
+       tgt->lut_last_transno = lsd->lsd_last_transno;
+       spin_unlock(&tgt->lut_translock);
+
+       lsd->lsd_mount_count++;
+
+       CDEBUG(D_INODE, "=======,=BEGIN DUMPING LAST_RCVD========\n");
+       CDEBUG(D_INODE, "%s: server last_transno: "LPU64"\n",
+              tgt_name(tgt), tgt->lut_last_transno);
+       CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
+              tgt_name(tgt), lsd->lsd_mount_count);
+       CDEBUG(D_INODE, "%s: server data size: %u\n",
+              tgt_name(tgt), lsd->lsd_server_size);
+       CDEBUG(D_INODE, "%s: per-client data start: %u\n",
+              tgt_name(tgt), lsd->lsd_client_start);
+       CDEBUG(D_INODE, "%s: per-client data size: %u\n",
+              tgt_name(tgt), lsd->lsd_client_size);
+       CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
+              tgt_name(tgt), last_rcvd_size);
+       CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
+              tgt_name(tgt), lsd->lsd_subdir_count);
+       CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", tgt_name(tgt),
+              last_rcvd_size <= lsd->lsd_client_start ? 0 :
+              (last_rcvd_size - lsd->lsd_client_start) /
+               lsd->lsd_client_size);
+       CDEBUG(D_INODE, "========END DUMPING LAST_RCVD========\n");
+
+       if (lsd->lsd_server_size == 0 || lsd->lsd_client_start == 0 ||
+           lsd->lsd_client_size == 0) {
+               CERROR("%s: bad last_rcvd contents!\n", tgt_name(tgt));
+               RETURN(-EINVAL);
+       }
+
+       if (!tgt->lut_obd->obd_replayable)
+               CWARN("%s: recovery support OFF\n", tgt_name(tgt));
+
+       rc = tgt_clients_data_init(env, tgt, last_rcvd_size);
+       if (rc < 0)
+               GOTO(err_client, rc);
+
+       spin_lock(&tgt->lut_translock);
+       /* obd_last_committed is used for compatibility
+        * with other lustre recovery code */
+       tgt->lut_obd->obd_last_committed = tgt->lut_last_transno;
+       spin_unlock(&tgt->lut_translock);
+
+       tgt->lut_obd->u.obt.obt_mount_count = lsd->lsd_mount_count;
+       tgt->lut_obd->u.obt.obt_instance = (__u32)lsd->lsd_mount_count;
+
+       /* save it, so mount count and last_transno is current */
+       rc = tgt_server_data_update(env, tgt, 0);
+       if (rc < 0)
+               GOTO(err_client, rc);
+
+       RETURN(0);
+
+err_client:
+       class_disconnect_exports(tgt->lut_obd);
+       return rc;
+}
+
+/* add credits for last_rcvd update */
+int tgt_txn_start_cb(const struct lu_env *env, struct thandle *th,
+                    void *cookie)
+{
+       struct lu_target        *tgt = cookie;
+       struct tgt_session_info *tsi;
+       int                      rc;
+
+       /* if there is no session, then this transaction is not result of
+        * request processing but some local operation */
+       if (env->le_ses == NULL)
+               return 0;
+
+       LASSERT(tgt->lut_last_rcvd);
+       tsi = tgt_ses_info(env);
+       /* OFD may start transaction without export assigned */
+       if (tsi->tsi_exp == NULL)
+               return 0;
+
+       rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
+                                    sizeof(struct lsd_client_data),
+                                    tsi->tsi_exp->exp_target_data.ted_lr_off,
+                                    th);
+       if (rc)
+               return rc;
+
+       rc = dt_declare_record_write(env, tgt->lut_last_rcvd,
+                                    sizeof(struct lr_server_data), 0, th);
+       if (rc)
+               return rc;
+
+       if (tsi->tsi_vbr_obj != NULL &&
+           !lu_object_remote(&tsi->tsi_vbr_obj->do_lu))
+               rc = dt_declare_version_set(env, tsi->tsi_vbr_obj, th);
+
+       return rc;
+}
+
+/* Update last_rcvd records with latests transaction data */
+int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th,
+                   void *cookie)
+{
+       struct lu_target        *tgt = cookie;
+       struct tgt_session_info *tsi;
+       struct tgt_thread_info  *tti = tgt_th_info(env);
+       struct dt_object        *obj = NULL;
+       int                      rc;
+       bool                     echo_client;
+
+       if (env->le_ses == NULL)
+               return 0;
+
+       tsi = tgt_ses_info(env);
+       /* OFD may start transaction without export assigned */
+       if (tsi->tsi_exp == NULL)
+               return 0;
+
+       echo_client = (tgt_ses_req(tsi) == NULL);
+
+       if (tti->tti_has_trans && !echo_client) {
+               if (tti->tti_mult_trans == 0) {
+                       CDEBUG(D_HA, "More than one transaction "LPU64"\n",
+                              tti->tti_transno);
+                       RETURN(0);
+               }
+               /* we need another transno to be assigned */
+               tti->tti_transno = 0;
+       } else if (th->th_result == 0) {
+               tti->tti_has_trans = 1;
+       }
+
+       if (tsi->tsi_vbr_obj != NULL &&
+           !lu_object_remote(&tsi->tsi_vbr_obj->do_lu)) {
+               obj = tsi->tsi_vbr_obj;
+       }
+
+       if (unlikely(echo_client)) /* echo client special case */
+               rc = tgt_last_rcvd_update_echo(env, tgt, obj, th,
+                                              tsi->tsi_exp);
+       else
+               rc = tgt_last_rcvd_update(env, tgt, obj, tsi->tsi_opdata, th,
+                                         tgt_ses_req(tsi));
+       return rc;
+}