Whamcloud - gitweb
LU-3467 ofd: use unified handler for OST requests 30/7130/35
authorMikhail Pershin <mike.pershin@intel.com>
Fri, 26 Jul 2013 11:29:22 +0000 (15:29 +0400)
committerOleg Drokin <oleg.drokin@intel.com>
Thu, 14 Nov 2013 08:10:10 +0000 (08:10 +0000)
Switch OST/OFD request processing to the unified request
handle.

Signed-off-by: Mikhail Pershin <mike.pershin@intel.com>
Change-Id: I27a2f8c2345b1aad3d714eab1e138d0c9d2ec258
Reviewed-on: http://review.whamcloud.com/7130
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
24 files changed:
lustre/include/linux/obd_class.h
lustre/include/lu_target.h
lustre/include/lustre_req_layout.h
lustre/include/obd_support.h
lustre/ldlm/ldlm_lib.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_lproc.c
lustre/obdclass/genops.c
lustre/obdclass/linux/linux-obdo.c
lustre/obdecho/echo_client.c
lustre/ofd/lproc_ofd.c
lustre/ofd/ofd_dev.c
lustre/ofd/ofd_internal.h
lustre/ofd/ofd_io.c
lustre/ofd/ofd_obd.c
lustre/ofd/ofd_trans.c
lustre/osp/osp_precreate.c
lustre/ost/ost_handler.c
lustre/ost/ost_internal.h
lustre/ptlrpc/layout.c
lustre/target/tgt_handler.c
lustre/target/tgt_internal.h
lustre/target/tgt_lastrcvd.c
lustre/target/tgt_main.c

index ecdd66d..2d2abbd 100644 (file)
@@ -55,7 +55,7 @@
 /* obdo.c */
 #ifdef __KERNEL__
 void obdo_from_la(struct obdo *dst, struct lu_attr *la, __u64 valid);
-void la_from_obdo(struct lu_attr *la, struct obdo *dst, obd_flag valid);
+void la_from_obdo(struct lu_attr *la, const struct obdo *dst, obd_flag valid);
 void obdo_refresh_inode(struct inode *dst, struct obdo *src, obd_flag valid);
 void obdo_to_inode(struct inode *dst, struct obdo *src, obd_flag valid);
 #define ll_inode_flags(inode)         (inode->i_flags)
index aa2c142..6755af8 100644 (file)
@@ -55,8 +55,12 @@ struct lu_target {
        rwlock_t                 lut_sptlrpc_lock;
        struct sptlrpc_rule_set  lut_sptlrpc_rset;
        int                      lut_sec_level;
+
+       spinlock_t               lut_flags_lock;
        unsigned int             lut_mds_capa:1,
-                                lut_oss_capa:1;
+                                lut_oss_capa:1,
+                                lut_syncjournal:1,
+                                lut_sync_lock_cancel:2;
 
        /* LAST_RCVD parameters */
        /** last_rcvd file */
@@ -96,15 +100,20 @@ struct tgt_session_info {
        struct lu_target        *tsi_tgt;
 
        const struct mdt_body   *tsi_mdt_body;
+       struct ost_body         *tsi_ost_body;
        struct lu_object        *tsi_corpus;
 
+       struct lu_fid            tsi_fid;
+       struct ldlm_res_id       tsi_resid;
        /*
         * Additional fail id that can be set by handler.
         */
        int                      tsi_reply_fail_id;
        int                      tsi_request_fail_id;
 
-       __u32                    tsi_has_trans:1; /* has txn already? */
+       int                      tsi_has_trans:1; /* has txn already? */
+       /* request JobID */
+       char                    *tsi_jobid;
 };
 
 static inline struct tgt_session_info *tgt_ses_info(const struct lu_env *env)
@@ -163,6 +172,8 @@ struct tgt_handler {
        int                      th_version;
        /* Handler function */
        int                     (*th_act)(struct tgt_session_info *tti);
+       /* Handler function for high priority requests */
+       int                     (*th_hp)(struct tgt_session_info *tti);
        /* Request format for this request */
        const struct req_format *th_fmt;
 };
@@ -196,6 +207,7 @@ char *tgt_name(struct lu_target *tgt);
 void tgt_counter_incr(struct obd_export *exp, int opcode);
 int tgt_connect_check_sptlrpc(struct ptlrpc_request *req,
                              struct obd_export *exp);
+int tgt_adapt_sptlrpc_conf(struct lu_target *tgt, int initial);
 int tgt_connect(struct tgt_session_info *tsi);
 int tgt_disconnect(struct tgt_session_info *uti);
 int tgt_obd_ping(struct tgt_session_info *tsi);
@@ -213,6 +225,25 @@ int tgt_sec_ctx_init(struct tgt_session_info *tsi);
 int tgt_sec_ctx_init_cont(struct tgt_session_info *tsi);
 int tgt_sec_ctx_fini(struct tgt_session_info *tsi);
 int tgt_sendpage(struct tgt_session_info *tsi, struct lu_rdpg *rdpg, int nob);
+int tgt_validate_obdo(struct tgt_session_info *tsi, struct obdo *oa);
+int tgt_sync(const struct lu_env *env, struct lu_target *tgt,
+            struct dt_object *obj);
+
+int tgt_io_thread_init(struct ptlrpc_thread *thread);
+void tgt_io_thread_done(struct ptlrpc_thread *thread);
+
+int tgt_extent_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
+                   __u64 start, __u64 end, struct lustre_handle *lh,
+                   int mode, __u64 *flags);
+void tgt_extent_unlock(struct lustre_handle *lh, ldlm_mode_t mode);
+int tgt_brw_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
+                struct obd_ioobj *obj, struct niobuf_remote *nb,
+                struct lustre_handle *lh, int mode);
+void tgt_brw_unlock(struct obd_ioobj *obj, struct niobuf_remote *niob,
+                   struct lustre_handle *lh, int mode);
+int tgt_brw_read(struct tgt_session_info *tsi);
+int tgt_brw_write(struct tgt_session_info *tsi);
+int tgt_hpreq_handler(struct ptlrpc_request *req);
 
 extern struct tgt_handler tgt_sec_ctx_handlers[];
 extern struct tgt_handler tgt_obd_handlers[];
@@ -258,6 +289,10 @@ int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tg,
 int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
                         struct dt_object *obj, __u64 opdata,
                         struct thandle *th, struct ptlrpc_request *req);
+int tgt_last_rcvd_update_echo(const struct lu_env *env, struct lu_target *tgt,
+                             struct dt_object *obj, struct thandle *th,
+                             struct obd_export *exp);
+
 enum {
        ESERIOUS = 0x0001000
 };
@@ -280,6 +315,18 @@ static inline int is_serious(int rc)
        return (rc < 0 && -rc & ESERIOUS);
 }
 
+/**
+ * Do not return server-side uid/gid to remote client
+ */
+static inline void tgt_drop_id(struct obd_export *exp, struct obdo *oa)
+{
+       if (unlikely(exp_connect_rmtclient(exp))) {
+               oa->o_uid = -1;
+               oa->o_gid = -1;
+               oa->o_valid &= ~(OBD_MD_FLUID | OBD_MD_FLGID);
+       }
+}
+
 /*
  * Unified target generic handers macros and generic functions.
  */
@@ -303,6 +350,11 @@ static inline int is_serious(int rc)
        TGT_RPC_HANDLER(MDS_FIRST_OPC, flags, name, fn, NULL,           \
                        LUSTRE_MDS_VERSION)
 
+/* MDT Request with a format known in advance */
+#define TGT_OST_HDL(flags, name, fn)                                   \
+       TGT_RPC_HANDLER(OST_FIRST_OPC, flags, name, fn, &RQF_ ## name,  \
+                       LUSTRE_OST_VERSION)
+
 /* MGS request with a format known in advance */
 #define TGT_MGS_HDL(flags, name, fn)                                   \
        TGT_RPC_HANDLER(MGS_FIRST_OPC, flags, name, fn, &RQF_ ## name,  \
index 3354618..c64bb8a 100644 (file)
@@ -213,7 +213,7 @@ extern struct req_format RQF_OST_BRW_READ;
 extern struct req_format RQF_OST_BRW_WRITE;
 extern struct req_format RQF_OST_STATFS;
 extern struct req_format RQF_OST_SET_GRANT_INFO;
-extern struct req_format RQF_OST_GET_INFO_GENERIC;
+extern struct req_format RQF_OST_GET_INFO;
 extern struct req_format RQF_OST_GET_INFO_LAST_ID;
 extern struct req_format RQF_OST_GET_INFO_LAST_FID;
 extern struct req_format RQF_OST_SET_INFO_LAST_FID;
index f1beff0..92c1a47 100644 (file)
@@ -312,6 +312,7 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_OST_ENOINO              0x229
 #define OBD_FAIL_OST_DQACQ_NET           0x230
 #define OBD_FAIL_OST_STATFS_EINPROGRESS  0x231
+#define OBD_FAIL_OST_SET_INFO_NET        0x232
 
 #define OBD_FAIL_LDLM                    0x300
 #define OBD_FAIL_LDLM_NAMESPACE_NEW      0x301
index 4e042bb..facc975 100644 (file)
@@ -1996,8 +1996,8 @@ static int target_recovery_thread(void *arg)
         thread->t_env = env;
         thread->t_id = -1; /* force filter_iobuf_get/put to use local buffers */
         env->le_ctx.lc_thread = thread;
-        thread->t_data = NULL;
-        thread->t_watchdog = NULL;
+       tgt_io_thread_init(thread); /* init thread_big_cache for IO requests */
+       thread->t_watchdog = NULL;
 
        CDEBUG(D_HA, "%s: started recovery thread pid %d\n", obd->obd_name,
               current_pid());
@@ -2092,9 +2092,10 @@ static int target_recovery_thread(void *arg)
         trd->trd_processing_task = 0;
        complete(&trd->trd_finishing);
 
-        OBD_FREE_PTR(thread);
-        OBD_FREE_PTR(env);
-        RETURN(rc);
+       tgt_io_thread_done(thread);
+       OBD_FREE_PTR(thread);
+       OBD_FREE_PTR(env);
+       RETURN(rc);
 }
 
 static int target_start_recovery_thread(struct lu_target *lut,
index 3f89faf..971e728 100644 (file)
@@ -4108,30 +4108,6 @@ static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
        EXIT;
 }
 
-static int mdt_adapt_sptlrpc_conf(struct obd_device *obd, int initial)
-{
-       struct mdt_device       *m = mdt_dev(obd->obd_lu_dev);
-       struct sptlrpc_rule_set  tmp_rset;
-       int                      rc;
-
-       sptlrpc_rule_set_init(&tmp_rset);
-       rc = sptlrpc_conf_target_get_rules(obd, &tmp_rset, initial);
-       if (rc) {
-               CERROR("mdt %s: failed get sptlrpc rules: %d\n",
-                      mdt_obd_name(m), rc);
-               return rc;
-       }
-
-       sptlrpc_target_update_exp_flavor(obd, &tmp_rset);
-
-       write_lock(&m->mdt_lut.lut_sptlrpc_lock);
-       sptlrpc_rule_set_free(&m->mdt_lut.lut_sptlrpc_rset);
-       m->mdt_lut.lut_sptlrpc_rset = tmp_rset;
-       write_unlock(&m->mdt_lut.lut_sptlrpc_lock);
-
-       return 0;
-}
-
 int mdt_postrecov(const struct lu_env *, struct mdt_device *);
 
 static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
@@ -4292,7 +4268,7 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
        if (rc)
                GOTO(err_tgt, rc);
 
-        mdt_adapt_sptlrpc_conf(obd, 1);
+       tgt_adapt_sptlrpc_conf(&m->mdt_lut, 1);
 
         next = m->mdt_child;
         rc = next->md_ops->mdo_iocontrol(env, next, OBD_IOC_GET_MNTOPT, 0,
@@ -4628,18 +4604,16 @@ static int mdt_obd_set_info_async(const struct lu_env *env,
                                   __u32 vallen, void *val,
                                   struct ptlrpc_request_set *set)
 {
-        struct obd_device     *obd = exp->exp_obd;
-        int                    rc;
-        ENTRY;
+       int rc;
 
-        LASSERT(obd);
+       ENTRY;
 
-        if (KEY_IS(KEY_SPTLRPC_CONF)) {
-                rc = mdt_adapt_sptlrpc_conf(obd, 0);
-                RETURN(rc);
-        }
+       if (KEY_IS(KEY_SPTLRPC_CONF)) {
+               rc = tgt_adapt_sptlrpc_conf(class_exp2tgt(exp), 0);
+               RETURN(rc);
+       }
 
-        RETURN(0);
+       RETURN(0);
 }
 
 /**
index 010caf4..4abb532 100644 (file)
@@ -508,8 +508,10 @@ static int lprocfs_wr_capa(struct file *file, const char *buffer,
                return -EINVAL;
        }
 
+       spin_lock(&mdt->mdt_lut.lut_flags_lock);
        mdt->mdt_lut.lut_oss_capa = !!(val & 0x1);
        mdt->mdt_lut.lut_mds_capa = !!(val & 0x2);
+       spin_unlock(&mdt->mdt_lut.lut_flags_lock);
        mdt->mdt_capa_conf = 1;
        LCONSOLE_INFO("MDS %s %s MDS fid capability.\n",
                      mdt_obd_name(mdt),
index e9f95c0..2f0b54b 100644 (file)
@@ -1236,14 +1236,14 @@ EXPORT_SYMBOL(class_disconnect);
 /* Return non-zero for a fully connected export */
 int class_connected_export(struct obd_export *exp)
 {
+       int connected = 0;
+
        if (exp) {
-               int connected;
                spin_lock(&exp->exp_lock);
-               connected = (exp->exp_conn_cnt > 0);
+               connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
                spin_unlock(&exp->exp_lock);
-               return connected;
        }
-       return 0;
+       return connected;
 }
 EXPORT_SYMBOL(class_connected_export);
 
index b393323..386e73f 100644 (file)
@@ -102,7 +102,7 @@ void obdo_from_la(struct obdo *dst, struct lu_attr *la, __u64 valid)
 EXPORT_SYMBOL(obdo_from_la);
 
 /*FIXME: Just copy from obdo_from_inode*/
-void la_from_obdo(struct lu_attr *dst, struct obdo *obdo, obd_flag valid)
+void la_from_obdo(struct lu_attr *dst, const struct obdo *obdo, obd_flag valid)
 {
         __u64 newvalid = 0;
 
index ff5341f..b15b123 100644 (file)
@@ -2782,6 +2782,9 @@ static int
 echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                       void *karg, void *uarg)
 {
+#ifdef HAVE_SERVER_SUPPORT
+       struct tgt_session_info *tsi;
+#endif
         struct obd_device      *obd = exp->exp_obd;
         struct echo_device     *ed = obd2echo_dev(obd);
         struct echo_client_obd *ec = ed->ed_ec;
@@ -2795,6 +2798,9 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         int                     rw = OBD_BRW_READ;
         int                     rc = 0;
         int                     i;
+#ifdef HAVE_SERVER_SUPPORT
+       struct lu_context        echo_session;
+#endif
         ENTRY;
 
         memset(&dummy_oti, 0, sizeof(dummy_oti));
@@ -2814,10 +2820,20 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         if (env == NULL)
                 RETURN(-ENOMEM);
 
-        rc = lu_env_init(env, LCT_DT_THREAD | LCT_MD_THREAD);
-        if (rc)
-                GOTO(out, rc = -ENOMEM);
+       rc = lu_env_init(env, LCT_DT_THREAD);
+       if (rc)
+               GOTO(out_alloc, rc = -ENOMEM);
 
+#ifdef HAVE_SERVER_SUPPORT
+       env->le_ses = &echo_session;
+       rc = lu_context_init(env->le_ses, LCT_SERVER_SESSION | LCT_NOREF);
+       if (unlikely(rc < 0))
+               GOTO(out_env, rc);
+       lu_context_enter(env->le_ses);
+
+       tsi = tgt_ses_info(env);
+       tsi->tsi_exp = ec->ec_exp;
+#endif
         switch (cmd) {
         case OBD_IOC_CREATE:                    /* may create echo object */
                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
@@ -2989,7 +3005,13 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
 
         EXIT;
 out:
+#ifdef HAVE_SERVER_SUPPORT
+       lu_context_exit(env->le_ses);
+       lu_context_fini(env->le_ses);
+out_env:
+#endif
         lu_env_fini(env);
+out_alloc:
         OBD_FREE_PTR(env);
 
         /* XXX this should be in a helper also called by target_send_reply */
index e11a81b..88e54cb 100644 (file)
@@ -396,11 +396,11 @@ int lprocfs_ofd_rd_sync_lock_cancel(char *page, char **start, off_t off,
                                    int count, int *eof, void *data)
 {
        struct obd_device       *obd = data;
-       struct ofd_device       *ofd = ofd_dev(obd->obd_lu_dev);
+       struct lu_target        *tgt = obd->u.obt.obt_lut;
        int                      rc;
 
        rc = snprintf(page, count, "%s\n",
-                     sync_on_cancel_states[ofd->ofd_sync_lock_cancel]);
+                     sync_on_cancel_states[tgt->lut_sync_lock_cancel]);
        return rc;
 }
 
@@ -408,7 +408,7 @@ int lprocfs_ofd_wr_sync_lock_cancel(struct file *file, const char *buffer,
                                    unsigned long count, void *data)
 {
        struct obd_device       *obd = data;
-       struct ofd_device       *ofd = ofd_dev(obd->obd_lu_dev);
+       struct lu_target        *tgt = obd->u.obt.obt_lut;
        int                      val = -1;
        int                      i;
 
@@ -430,9 +430,9 @@ int lprocfs_ofd_wr_sync_lock_cancel(struct file *file, const char *buffer,
        if (val < 0 || val > 2)
                return -EINVAL;
 
-       spin_lock(&ofd->ofd_flags_lock);
-       ofd->ofd_sync_lock_cancel = val;
-       spin_unlock(&ofd->ofd_flags_lock);
+       spin_lock(&tgt->lut_flags_lock);
+       tgt->lut_sync_lock_cancel = val;
+       spin_unlock(&tgt->lut_flags_lock);
        return count;
 }
 
@@ -532,16 +532,32 @@ void lprocfs_ofd_init_vars(struct lprocfs_static_vars *lvars)
 
 void ofd_stats_counter_init(struct lprocfs_stats *stats)
 {
-       LASSERT(stats && stats->ls_num == LPROC_OFD_STATS_LAST);
+       LASSERT(stats && stats->ls_num >= LPROC_OFD_STATS_LAST);
+
        lprocfs_counter_init(stats, LPROC_OFD_STATS_READ,
-                            LPROCFS_CNTR_AVGMINMAX, "read", "bytes");
+                            LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes");
        lprocfs_counter_init(stats, LPROC_OFD_STATS_WRITE,
-                            LPROCFS_CNTR_AVGMINMAX, "write", "bytes");
+                            LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes");
+       lprocfs_counter_init(stats, LPROC_OFD_STATS_GETATTR,
+                            0, "getattr", "reqs");
        lprocfs_counter_init(stats, LPROC_OFD_STATS_SETATTR,
                             0, "setattr", "reqs");
        lprocfs_counter_init(stats, LPROC_OFD_STATS_PUNCH,
                             0, "punch", "reqs");
        lprocfs_counter_init(stats, LPROC_OFD_STATS_SYNC,
                             0, "sync", "reqs");
+       lprocfs_counter_init(stats, LPROC_OFD_STATS_DESTROY,
+                            0, "destroy", "reqs");
+       lprocfs_counter_init(stats, LPROC_OFD_STATS_CREATE,
+                            0, "create", "reqs");
+       lprocfs_counter_init(stats, LPROC_OFD_STATS_STATFS,
+                            0, "statfs", "reqs");
+       lprocfs_counter_init(stats, LPROC_OFD_STATS_GET_INFO,
+                            0, "get_info", "reqs");
+       lprocfs_counter_init(stats, LPROC_OFD_STATS_SET_INFO,
+                            0, "set_info", "reqs");
+       lprocfs_counter_init(stats, LPROC_OFD_STATS_QUOTACTL,
+                            0, "quotactl", "reqs");
 }
+
 #endif /* LPROCFS */
index b7d5348..0079ecc 100644 (file)
@@ -46,6 +46,9 @@
 #include <lustre_param.h>
 #include <lustre_fid.h>
 #include <lustre_lfsck.h>
+#include <lustre/lustre_idl.h>
+#include <lustre_dlm.h>
+#include <lustre_quota.h>
 
 #include "ofd_internal.h"
 
@@ -154,7 +157,6 @@ static void ofd_stack_fini(const struct lu_env *env, struct ofd_device *m,
        ENTRY;
 
        lu_site_purge(env, top->ld_site, ~0);
-
        /* process cleanup, pass mdt obd name to get obd umount flags */
        lustre_cfg_bufs_reset(&bufs, obd->obd_name);
        if (obd->obd_force)
@@ -173,6 +175,10 @@ static void ofd_stack_fini(const struct lu_env *env, struct ofd_device *m,
        lustre_cfg_free(lcfg);
 
        lu_site_purge(env, top->ld_site, ~0);
+       if (!cfs_hash_is_empty(top->ld_site->ls_obj_hash)) {
+               LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL);
+               lu_site_print(env, top->ld_site, &msgdata, lu_cdebug_printer);
+       }
 
        LASSERT(m->ofd_osd_exp);
        obd_disconnect(m->ofd_osd_exp);
@@ -376,7 +382,7 @@ static int ofd_prepare(const struct lu_env *env, struct lu_device *pdev,
                rc = 0;
        }
 
-       target_recovery_init(&ofd->ofd_lut, ost_handle);
+       target_recovery_init(&ofd->ofd_lut, tgt_request_handle);
        LASSERT(obd->obd_no_conn);
        spin_lock(&obd->obd_dev_lock);
        obd->obd_no_conn = 0;
@@ -435,19 +441,13 @@ static int ofd_procfs_init(struct ofd_device *ofd)
                RETURN(rc);
        }
 
-       rc = lprocfs_alloc_obd_stats(obd, LPROC_OFD_LAST);
+       rc = lprocfs_alloc_obd_stats(obd, LPROC_OFD_STATS_LAST);
        if (rc) {
                CERROR("%s: lprocfs_alloc_obd_stats failed: %d.\n",
                       obd->obd_name, rc);
                GOTO(obd_cleanup, rc);
        }
 
-       /* Init OFD private stats here */
-       lprocfs_counter_init(obd->obd_stats, LPROC_OFD_READ_BYTES,
-                            LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes");
-       lprocfs_counter_init(obd->obd_stats, LPROC_OFD_WRITE_BYTES,
-                            LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes");
-
        obd->obd_uses_nid_stats = 1;
 
        entry = lprocfs_register("exports", obd->obd_proc_entry, NULL, NULL);
@@ -469,6 +469,8 @@ static int ofd_procfs_init(struct ofd_device *ofd)
                GOTO(obd_cleanup, rc);
        }
 
+       ofd_stats_counter_init(obd->obd_stats);
+
        rc = lprocfs_job_stats_init(obd, LPROC_OFD_STATS_LAST,
                                    ofd_stats_counter_init);
        if (rc)
@@ -612,11 +614,1014 @@ out_free:
        return rc;
 }
 
+int ofd_set_info_hdl(struct tgt_session_info *tsi)
+{
+       struct ptlrpc_request   *req = tgt_ses_req(tsi);
+       struct ost_body         *body = NULL, *repbody;
+       void                    *key, *val = NULL;
+       int                      keylen, vallen, rc = 0;
+       bool                     is_grant_shrink;
+       struct ofd_device       *ofd = ofd_exp(tsi->tsi_exp);
+
+       ENTRY;
+
+       key = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_KEY);
+       if (key == NULL) {
+               DEBUG_REQ(D_HA, req, "no set_info key");
+               RETURN(err_serious(-EFAULT));
+       }
+       keylen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_KEY,
+                                     RCL_CLIENT);
+
+       val = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_VAL);
+       if (val == NULL) {
+               DEBUG_REQ(D_HA, req, "no set_info val");
+               RETURN(err_serious(-EFAULT));
+       }
+       vallen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_VAL,
+                                     RCL_CLIENT);
+
+       is_grant_shrink = KEY_IS(KEY_GRANT_SHRINK);
+       if (is_grant_shrink)
+               /* In this case the value is actually an RMF_OST_BODY, so we
+                * transmutate the type of this PTLRPC */
+               req_capsule_extend(tsi->tsi_pill, &RQF_OST_SET_GRANT_INFO);
+
+       rc = req_capsule_server_pack(tsi->tsi_pill);
+       if (rc < 0)
+               RETURN(rc);
+
+       if (is_grant_shrink) {
+               body = req_capsule_client_get(tsi->tsi_pill, &RMF_OST_BODY);
+
+               repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
+               *repbody = *body;
+
+               /** handle grant shrink, similar to a read request */
+               ofd_grant_prepare_read(tsi->tsi_env, tsi->tsi_exp,
+                                      &repbody->oa);
+       } else if (KEY_IS(KEY_EVICT_BY_NID)) {
+               if (vallen > 0)
+                       obd_export_evict_by_nid(tsi->tsi_exp->exp_obd, val);
+               rc = 0;
+       } else if (KEY_IS(KEY_CAPA_KEY)) {
+               rc = ofd_update_capa_key(ofd, val);
+       } else if (KEY_IS(KEY_SPTLRPC_CONF)) {
+               rc = tgt_adapt_sptlrpc_conf(tsi->tsi_tgt, 0);
+       } else {
+               CERROR("%s: Unsupported key %s\n",
+                      tgt_name(tsi->tsi_tgt), (char *)key);
+               rc = -EOPNOTSUPP;
+       }
+       ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_SET_INFO,
+                        tsi->tsi_jobid, 1);
+
+       RETURN(rc);
+}
+
+static int ofd_fiemap_get(const struct lu_env *env, struct ofd_device *ofd,
+                         struct lu_fid *fid, struct ll_user_fiemap *fiemap)
+{
+       struct ofd_object       *fo;
+       int                      rc;
+
+       fo = ofd_object_find(env, ofd, fid);
+       if (IS_ERR(fo)) {
+               CERROR("%s: error finding object "DFID"\n",
+                      ofd_name(ofd), PFID(fid));
+               return PTR_ERR(fo);
+       }
+
+       ofd_read_lock(env, fo);
+       if (ofd_object_exists(fo))
+               rc = dt_fiemap_get(env, ofd_object_child(fo), fiemap);
+       else
+               rc = -ENOENT;
+       ofd_read_unlock(env, fo);
+       ofd_object_put(env, fo);
+       return rc;
+}
+
+struct locked_region {
+       cfs_list_t              list;
+       struct lustre_handle    lh;
+};
+
+static int lock_region(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
+                      unsigned long long begin, unsigned long long end,
+                      cfs_list_t *locked)
+{
+       struct locked_region    *region = NULL;
+       __u64                    flags = 0;
+       int                      rc;
+
+       LASSERT(begin <= end);
+       OBD_ALLOC_PTR(region);
+       if (region == NULL)
+               return -ENOMEM;
+
+       rc = tgt_extent_lock(ns, res_id, begin, end, &region->lh,
+                            LCK_PR, &flags);
+       if (rc != 0)
+               return rc;
+
+       CDEBUG(D_OTHER, "ost lock [%llu,%llu], lh=%p\n", begin, end,
+              &region->lh);
+       cfs_list_add(&region->list, locked);
+
+       return 0;
+}
+
+static int lock_zero_regions(struct ldlm_namespace *ns,
+                            struct ldlm_res_id *res_id,
+                            struct ll_user_fiemap *fiemap,
+                            cfs_list_t *locked)
+{
+       __u64 begin = fiemap->fm_start;
+       unsigned int i;
+       int rc = 0;
+       struct ll_fiemap_extent *fiemap_start = fiemap->fm_extents;
+
+       ENTRY;
+
+       CDEBUG(D_OTHER, "extents count %u\n", fiemap->fm_mapped_extents);
+       for (i = 0; i < fiemap->fm_mapped_extents; i++) {
+               if (fiemap_start[i].fe_logical > begin) {
+                       CDEBUG(D_OTHER, "ost lock [%llu,%llu]\n",
+                              begin, fiemap_start[i].fe_logical);
+                       rc = lock_region(ns, res_id, begin,
+                                        fiemap_start[i].fe_logical, locked);
+                       if (rc)
+                               RETURN(rc);
+               }
+
+               begin = fiemap_start[i].fe_logical + fiemap_start[i].fe_length;
+       }
+
+       if (begin < (fiemap->fm_start + fiemap->fm_length)) {
+               CDEBUG(D_OTHER, "ost lock [%llu,%llu]\n",
+                      begin, fiemap->fm_start + fiemap->fm_length);
+               rc = lock_region(ns, res_id, begin,
+                                fiemap->fm_start + fiemap->fm_length, locked);
+       }
+
+       RETURN(rc);
+}
+
+static void unlock_zero_regions(struct ldlm_namespace *ns, cfs_list_t *locked)
+{
+       struct locked_region *entry, *temp;
+
+       cfs_list_for_each_entry_safe(entry, temp, locked, list) {
+               CDEBUG(D_OTHER, "ost unlock lh=%p\n", &entry->lh);
+               tgt_extent_unlock(&entry->lh, LCK_PR);
+               cfs_list_del(&entry->list);
+               OBD_FREE_PTR(entry);
+       }
+}
+
+int ofd_get_info_hdl(struct tgt_session_info *tsi)
+{
+       struct obd_export               *exp = tsi->tsi_exp;
+       struct ofd_device               *ofd = ofd_exp(exp);
+       struct ofd_thread_info          *fti = tsi2ofd_info(tsi);
+       void                            *key;
+       int                              keylen;
+       int                              replylen, rc = 0;
+
+       ENTRY;
+
+       /* this common part for get_info rpc */
+       key = req_capsule_client_get(tsi->tsi_pill, &RMF_GETINFO_KEY);
+       if (key == NULL) {
+               DEBUG_REQ(D_HA, tgt_ses_req(tsi), "no get_info key");
+               RETURN(err_serious(-EPROTO));
+       }
+       keylen = req_capsule_get_size(tsi->tsi_pill, &RMF_GETINFO_KEY,
+                                     RCL_CLIENT);
+
+       if (KEY_IS(KEY_LAST_ID)) {
+               obd_id          *last_id;
+               struct ofd_seq  *oseq;
+
+               req_capsule_extend(tsi->tsi_pill, &RQF_OST_GET_INFO_LAST_ID);
+               rc = req_capsule_server_pack(tsi->tsi_pill);
+               if (rc)
+                       RETURN(err_serious(rc));
+
+               last_id = req_capsule_server_get(tsi->tsi_pill, &RMF_OBD_ID);
+
+               oseq = ofd_seq_load(tsi->tsi_env, ofd,
+                                   (obd_seq)exp->exp_filter_data.fed_group);
+               if (IS_ERR(oseq))
+                       rc = -EFAULT;
+               else
+                       *last_id = ofd_seq_last_oid(oseq);
+               ofd_seq_put(tsi->tsi_env, oseq);
+       } else if (KEY_IS(KEY_FIEMAP)) {
+               struct ll_fiemap_info_key       *fm_key;
+               struct ll_user_fiemap           *fiemap;
+               struct lu_fid                   *fid = &fti->fti_fid;
+
+               req_capsule_extend(tsi->tsi_pill, &RQF_OST_GET_INFO_FIEMAP);
+
+               fm_key = req_capsule_client_get(tsi->tsi_pill, &RMF_FIEMAP_KEY);
+               rc = tgt_validate_obdo(tsi, &fm_key->oa);
+               if (rc)
+                       RETURN(err_serious(rc));
+
+               replylen = fiemap_count_to_size(fm_key->fiemap.fm_extent_count);
+               req_capsule_set_size(tsi->tsi_pill, &RMF_FIEMAP_VAL,
+                                    RCL_SERVER, replylen);
+
+               rc = req_capsule_server_pack(tsi->tsi_pill);
+               if (rc)
+                       RETURN(err_serious(rc));
+
+               fiemap = req_capsule_server_get(tsi->tsi_pill, &RMF_FIEMAP_VAL);
+               if (fiemap == NULL)
+                       RETURN(-ENOMEM);
+
+               rc = ostid_to_fid(fid, &fm_key->oa.o_oi, 0);
+               if (rc != 0)
+                       RETURN(rc);
+
+               CDEBUG(D_INODE, "get FIEMAP of object "DFID"\n", PFID(fid));
+
+               *fiemap = fm_key->fiemap;
+               rc = ofd_fiemap_get(tsi->tsi_env, ofd, fid, fiemap);
+
+               /* LU-3219: Lock the sparse areas to make sure dirty
+                * flushed back from client, then call fiemap again. */
+               if (fm_key->oa.o_valid & OBD_MD_FLFLAGS &&
+                   fm_key->oa.o_flags & OBD_FL_SRVLOCK) {
+                       cfs_list_t locked = CFS_LIST_HEAD_INIT(locked);
+
+                       ost_fid_build_resid(fid, &fti->fti_resid);
+                       rc = lock_zero_regions(ofd->ofd_namespace,
+                                              &fti->fti_resid, fiemap,
+                                              &locked);
+                       if (rc == 0 && !cfs_list_empty(&locked)) {
+                               rc = ofd_fiemap_get(tsi->tsi_env, ofd, fid,
+                                                   fiemap);
+                               unlock_zero_regions(ofd->ofd_namespace,
+                                                   &locked);
+                       }
+               }
+       } else if (KEY_IS(KEY_LAST_FID)) {
+               struct ofd_device       *ofd = ofd_exp(exp);
+               struct ofd_seq          *oseq;
+               struct lu_fid           *fid;
+               int                      rc;
+
+               req_capsule_extend(tsi->tsi_pill, &RQF_OST_GET_INFO_LAST_FID);
+               rc = req_capsule_server_pack(tsi->tsi_pill);
+               if (rc)
+                       RETURN(err_serious(rc));
+
+               fid = req_capsule_client_get(tsi->tsi_pill, &RMF_FID);
+               if (fid == NULL)
+                       RETURN(err_serious(-EPROTO));
+
+               fid_le_to_cpu(&fti->fti_ostid.oi_fid, fid);
+
+               fid = req_capsule_server_get(tsi->tsi_pill, &RMF_FID);
+               if (fid == NULL)
+                       RETURN(-ENOMEM);
+
+               oseq = ofd_seq_load(tsi->tsi_env, ofd,
+                                   ostid_seq(&fti->fti_ostid));
+               if (IS_ERR(oseq))
+                       RETURN(PTR_ERR(oseq));
+
+               rc = ostid_to_fid(fid, &oseq->os_oi,
+                                 ofd->ofd_lut.lut_lsd.lsd_osd_index);
+               if (rc != 0)
+                       GOTO(out_put, rc);
+
+               CDEBUG(D_HA, "%s: LAST FID is "DFID"\n", ofd_name(ofd),
+                      PFID(fid));
+out_put:
+               ofd_seq_put(tsi->tsi_env, oseq);
+       } else {
+               CERROR("%s: not supported key %s\n", tgt_name(tsi->tsi_tgt),
+                      (char *)key);
+               rc = -EOPNOTSUPP;
+       }
+       ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_GET_INFO,
+                        tsi->tsi_jobid, 1);
+
+       RETURN(rc);
+}
+
+static int ofd_getattr_hdl(struct tgt_session_info *tsi)
+{
+       struct ofd_thread_info  *fti = tsi2ofd_info(tsi);
+       struct ofd_device       *ofd = ofd_exp(tsi->tsi_exp);
+       struct ost_body         *repbody;
+       struct lustre_handle     lh = { 0 };
+       struct ofd_object       *fo;
+       __u64                    flags = 0;
+       ldlm_mode_t              lock_mode = LCK_PR;
+       bool                     srvlock;
+       int                      rc;
+
+       ENTRY;
+
+       LASSERT(tsi->tsi_ost_body != NULL);
+
+       repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
+       if (repbody == NULL)
+               RETURN(-ENOMEM);
+
+       repbody->oa.o_oi = tsi->tsi_ost_body->oa.o_oi;
+       repbody->oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+
+       srvlock = tsi->tsi_ost_body->oa.o_valid & OBD_MD_FLFLAGS &&
+                 tsi->tsi_ost_body->oa.o_flags & OBD_FL_SRVLOCK;
+
+       if (srvlock) {
+               if (unlikely(tsi->tsi_ost_body->oa.o_flags & OBD_FL_FLUSH))
+                       lock_mode = LCK_PW;
+
+               rc = tgt_extent_lock(tsi->tsi_tgt->lut_obd->obd_namespace,
+                                    &tsi->tsi_resid, 0, OBD_OBJECT_EOF, &lh,
+                                    lock_mode, &flags);
+               if (rc != 0)
+                       RETURN(rc);
+       }
+
+       fo = ofd_object_find_exists(tsi->tsi_env, ofd, &tsi->tsi_fid);
+       if (IS_ERR(fo))
+               GOTO(out, rc = PTR_ERR(fo));
+
+       rc = ofd_attr_get(tsi->tsi_env, fo, &fti->fti_attr);
+       if (rc == 0) {
+               __u64    curr_version;
+
+               obdo_from_la(&repbody->oa, &fti->fti_attr,
+                            OFD_VALID_FLAGS | LA_UID | LA_GID);
+               tgt_drop_id(tsi->tsi_exp, &repbody->oa);
+
+               /* Store object version in reply */
+               curr_version = dt_version_get(tsi->tsi_env,
+                                             ofd_object_child(fo));
+               if ((__s64)curr_version != -EOPNOTSUPP) {
+                       repbody->oa.o_valid |= OBD_MD_FLDATAVERSION;
+                       repbody->oa.o_data_version = curr_version;
+               }
+       }
+
+       ofd_object_put(tsi->tsi_env, fo);
+out:
+       if (srvlock)
+               tgt_extent_unlock(&lh, lock_mode);
+
+       ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_GETATTR,
+                        tsi->tsi_jobid, 1);
+
+       repbody->oa.o_valid |= OBD_MD_FLFLAGS;
+       repbody->oa.o_flags = OBD_FL_FLUSH;
+
+       RETURN(rc);
+}
+
+static int ofd_setattr_hdl(struct tgt_session_info *tsi)
+{
+       struct ofd_thread_info  *fti = tsi2ofd_info(tsi);
+       struct ofd_device       *ofd = ofd_exp(tsi->tsi_exp);
+       struct ost_body         *body = tsi->tsi_ost_body;
+       struct ost_body         *repbody;
+       struct ldlm_resource    *res;
+       struct ofd_object       *fo;
+       struct filter_fid       *ff = NULL;
+       int                      rc = 0;
+
+       ENTRY;
+
+       LASSERT(body != NULL);
+
+       repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
+       if (repbody == NULL)
+               RETURN(-ENOMEM);
+
+       repbody->oa.o_oi = body->oa.o_oi;
+       repbody->oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+
+       /* This would be very bad - accidentally truncating a file when
+        * changing the time or similar - bug 12203. */
+       if (body->oa.o_valid & OBD_MD_FLSIZE &&
+           body->oa.o_size != OBD_OBJECT_EOF) {
+               static char mdsinum[48];
+
+               if (body->oa.o_valid & OBD_MD_FLFID)
+                       snprintf(mdsinum, sizeof(mdsinum) - 1,
+                                "of parent "DFID, body->oa.o_parent_seq,
+                                body->oa.o_parent_oid, 0);
+               else
+                       mdsinum[0] = '\0';
+
+               CERROR("%s: setattr from %s is trying to truncate object "DFID
+                      " %s\n", ofd_name(ofd), obd_export_nid2str(tsi->tsi_exp),
+                      PFID(&tsi->tsi_fid), mdsinum);
+               RETURN(-EPERM);
+       }
+
+       fo = ofd_object_find_exists(tsi->tsi_env, ofd, &tsi->tsi_fid);
+       if (IS_ERR(fo))
+               GOTO(out, rc = PTR_ERR(fo));
+
+       la_from_obdo(&fti->fti_attr, &body->oa, body->oa.o_valid);
+       fti->fti_attr.la_valid &= ~LA_TYPE;
+
+       if (body->oa.o_valid & OBD_MD_FLFID) {
+               ff = &fti->fti_mds_fid;
+               ofd_prepare_fidea(ff, &body->oa);
+       }
+
+       /* setting objects attributes (including owner/group) */
+       rc = ofd_attr_set(tsi->tsi_env, fo, &fti->fti_attr, ff);
+       if (rc != 0)
+               GOTO(out_put, rc);
+
+       obdo_from_la(&repbody->oa, &fti->fti_attr,
+                    OFD_VALID_FLAGS | LA_UID | LA_GID);
+       tgt_drop_id(tsi->tsi_exp, &repbody->oa);
+
+       ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_SETATTR,
+                        tsi->tsi_jobid, 1);
+       EXIT;
+out_put:
+       ofd_object_put(tsi->tsi_env, fo);
+out:
+       if (rc == 0) {
+               /* we do not call this before to avoid lu_object_find() in
+                *  ->lvbo_update() holding another reference on the object.
+                * otherwise concurrent destroy can make the object unavailable
+                * for 2nd lu_object_find() waiting for the first reference
+                * to go... deadlock! */
+               res = ldlm_resource_get(ofd->ofd_namespace, NULL,
+                                       &tsi->tsi_resid, LDLM_EXTENT, 0);
+               if (res != NULL) {
+                       ldlm_res_lvbo_update(res, NULL, 0);
+                       ldlm_resource_putref(res);
+               }
+       }
+       return rc;
+}
+
+static int ofd_create_hdl(struct tgt_session_info *tsi)
+{
+       struct ost_body         *repbody;
+       const struct obdo       *oa = &tsi->tsi_ost_body->oa;
+       struct obdo             *rep_oa;
+       struct ofd_device       *ofd = ofd_exp(tsi->tsi_exp);
+       obd_seq                  seq = ostid_seq(&oa->o_oi);
+       obd_id                   oid = ostid_id(&oa->o_oi);
+       struct ofd_seq          *oseq;
+       int                      rc = 0, diff;
+       int                      sync_trans = 0;
+
+       ENTRY;
+
+       if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
+               RETURN(-EROFS);
+
+       repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
+       if (repbody == NULL)
+               RETURN(-ENOMEM);
+
+       rep_oa = &repbody->oa;
+       rep_oa->o_oi = oa->o_oi;
+
+       LASSERT(seq >= FID_SEQ_OST_MDT0);
+       LASSERT(oa->o_valid & OBD_MD_FLGROUP);
+
+       CDEBUG(D_INFO, "ofd_create("DOSTID")\n", POSTID(&oa->o_oi));
+
+       oseq = ofd_seq_load(tsi->tsi_env, ofd, seq);
+       if (IS_ERR(oseq)) {
+               CERROR("%s: Can't find FID Sequence "LPX64": rc = %ld\n",
+                      ofd_name(ofd), seq, PTR_ERR(oseq));
+               RETURN(-EINVAL);
+       }
+
+       if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+           (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
+               if (!ofd_obd(ofd)->obd_recovering ||
+                   oid > ofd_seq_last_oid(oseq)) {
+                       CERROR("%s: recreate objid "DOSTID" > last id "LPU64
+                              "\n", ofd_name(ofd), POSTID(&oa->o_oi),
+                              ofd_seq_last_oid(oseq));
+                       GOTO(out_nolock, rc = -EINVAL);
+               }
+               /* Do nothing here, we re-create objects during recovery
+                * upon write replay, see ofd_preprw_write() */
+               GOTO(out_nolock, rc = 0);
+       }
+       /* former ofd_handle_precreate */
+       if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+           (oa->o_flags & OBD_FL_DELORPHAN)) {
+               /* destroy orphans */
+               if (lustre_msg_get_conn_cnt(tgt_ses_req(tsi)->rq_reqmsg) <
+                   tsi->tsi_exp->exp_conn_cnt) {
+                       CERROR("%s: dropping old orphan cleanup request\n",
+                              ofd_name(ofd));
+                       GOTO(out_nolock, rc = 0);
+               }
+               /* This causes inflight precreates to abort and drop lock */
+               oseq->os_destroys_in_progress = 1;
+               mutex_lock(&oseq->os_create_lock);
+               if (!oseq->os_destroys_in_progress) {
+                       CERROR("%s:["LPU64"] destroys_in_progress already"
+                              " cleared\n", ofd_name(ofd), seq);
+                       ostid_set_id(&rep_oa->o_oi, ofd_seq_last_oid(oseq));
+                       GOTO(out, rc = 0);
+               }
+               diff = oid - ofd_seq_last_oid(oseq);
+               CDEBUG(D_HA, "ofd_last_id() = "LPU64" -> diff = %d\n",
+                       ofd_seq_last_oid(oseq), diff);
+               if (-diff > OST_MAX_PRECREATE) {
+                       /* FIXME: should reset precreate_next_id on MDS */
+                       rc = 0;
+               } else if (diff < 0) {
+                       rc = ofd_orphans_destroy(tsi->tsi_env, tsi->tsi_exp,
+                                                ofd, rep_oa);
+                       oseq->os_destroys_in_progress = 0;
+               } else {
+                       /* XXX: Used by MDS for the first time! */
+                       oseq->os_destroys_in_progress = 0;
+               }
+       } else {
+               mutex_lock(&oseq->os_create_lock);
+               if (lustre_msg_get_conn_cnt(tgt_ses_req(tsi)->rq_reqmsg) <
+                   tsi->tsi_exp->exp_conn_cnt) {
+                       CERROR("%s: dropping old precreate request\n",
+                              ofd_name(ofd));
+                       GOTO(out, rc = 0);
+               }
+               /* only precreate if seq is 0, IDIF or normal and also o_id
+                * must be specfied */
+               if ((!fid_seq_is_mdt(seq) && !fid_seq_is_norm(seq) &&
+                    !fid_seq_is_idif(seq)) || oid == 0) {
+                       diff = 1; /* shouldn't we create this right now? */
+               } else {
+                       diff = oid - ofd_seq_last_oid(oseq);
+                       /* Do sync create if the seq is about to used up */
+                       if (fid_seq_is_idif(seq) || fid_seq_is_mdt0(seq)) {
+                               if (unlikely(oid >= IDIF_MAX_OID - 1))
+                                       sync_trans = 1;
+                       } else if (fid_seq_is_norm(seq)) {
+                               if (unlikely(oid >=
+                                            LUSTRE_DATA_SEQ_MAX_WIDTH - 1))
+                                       sync_trans = 1;
+                       } else {
+                               CERROR("%s : invalid o_seq "DOSTID"\n",
+                                      ofd_name(ofd), POSTID(&oa->o_oi));
+                               GOTO(out, rc = -EINVAL);
+                       }
+               }
+       }
+       if (diff > 0) {
+               cfs_time_t       enough_time = cfs_time_shift(DISK_TIMEOUT);
+               obd_id           next_id;
+               int              created = 0;
+               int              count;
+
+               if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
+                   !(oa->o_flags & OBD_FL_DELORPHAN)) {
+                       /* don't enforce grant during orphan recovery */
+                       rc = ofd_grant_create(tsi->tsi_env,
+                                             ofd_obd(ofd)->obd_self_export,
+                                             &diff);
+                       if (rc) {
+                               CDEBUG(D_HA, "%s: failed to acquire grant "
+                                      "space for precreate (%d): rc = %d\n",
+                                      ofd_name(ofd), diff, rc);
+                               diff = 0;
+                       }
+               }
+
+               /* This can happen if a new OST is formatted and installed
+                * in place of an old one at the same index.  Instead of
+                * precreating potentially millions of deleted old objects
+                * (possibly filling the OST), only precreate the last batch.
+                * LFSCK will eventually clean up any orphans. LU-14 */
+               if (diff > 5 * OST_MAX_PRECREATE) {
+                       diff = OST_MAX_PRECREATE / 2;
+                       LCONSOLE_WARN("%s: precreate FID "DOSTID" is over %u "
+                                     "larger than the LAST_ID "DOSTID", only "
+                                     "precreating the last %u objects.\n",
+                                     ofd_name(ofd), POSTID(&oa->o_oi),
+                                     5 * OST_MAX_PRECREATE,
+                                     POSTID(&oseq->os_oi), diff);
+                       ofd_seq_last_oid_set(oseq, ostid_id(&oa->o_oi) - diff);
+               }
+
+               while (diff > 0) {
+                       next_id = ofd_seq_last_oid(oseq) + 1;
+                       count = ofd_precreate_batch(ofd, diff);
+
+                       CDEBUG(D_HA, "%s: reserve %d objects in group "LPX64
+                              " at "LPU64"\n", ofd_name(ofd),
+                              count, seq, next_id);
+
+                       if (cfs_time_after(jiffies, enough_time)) {
+                               LCONSOLE_WARN("%s: Slow creates, %d/%d objects"
+                                             " created at a rate of %d/s\n",
+                                             ofd_name(ofd), created,
+                                             diff + created,
+                                             created / DISK_TIMEOUT);
+                               break;
+                       }
+
+                       rc = ofd_precreate_objects(tsi->tsi_env, ofd, next_id,
+                                                  oseq, count, sync_trans);
+                       if (rc > 0) {
+                               created += rc;
+                               diff -= rc;
+                       } else if (rc < 0) {
+                               break;
+                       }
+               }
+               if (created > 0)
+                       /* some objects got created, we can return
+                        * them, even if last creation failed */
+                       rc = 0;
+               else
+                       CERROR("%s: unable to precreate: rc = %d\n",
+                              ofd_name(ofd), rc);
+
+               if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
+                   !(oa->o_flags & OBD_FL_DELORPHAN))
+                       ofd_grant_commit(tsi->tsi_env,
+                                        ofd_obd(ofd)->obd_self_export, rc);
+
+               ostid_set_id(&rep_oa->o_oi, ofd_seq_last_oid(oseq));
+       }
+       EXIT;
+       ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_CREATE,
+                        tsi->tsi_jobid, 1);
+out:
+       mutex_unlock(&oseq->os_create_lock);
+out_nolock:
+       if (rc == 0)
+               rep_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
+
+       ofd_seq_put(tsi->tsi_env, oseq);
+       return rc;
+}
+
+static int ofd_destroy_hdl(struct tgt_session_info *tsi)
+{
+       const struct ost_body   *body = tsi->tsi_ost_body;
+       struct ost_body         *repbody;
+       struct ofd_device       *ofd = ofd_exp(tsi->tsi_exp);
+       struct ofd_thread_info  *fti = tsi2ofd_info(tsi);
+       obd_count                count;
+       int                      rc = 0;
+
+       ENTRY;
+
+       if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
+               RETURN(-EROFS);
+
+       /* This is old case for clients before Lustre 2.4 */
+       /* If there's a DLM request, cancel the locks mentioned in it */
+       if (req_capsule_field_present(tsi->tsi_pill, &RMF_DLM_REQ,
+                                     RCL_CLIENT)) {
+               struct ldlm_request *dlm;
+
+               dlm = req_capsule_client_get(tsi->tsi_pill, &RMF_DLM_REQ);
+               if (dlm == NULL)
+                       RETURN(-EFAULT);
+               ldlm_request_cancel(tgt_ses_req(tsi), dlm, 0);
+       }
+
+       repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
+       repbody->oa.o_oi = body->oa.o_oi;
+
+       /* check that o_misc makes sense */
+       if (body->oa.o_valid & OBD_MD_FLOBJCOUNT)
+               count = body->oa.o_misc;
+       else
+               count = 1; /* default case - single destroy */
+
+       /**
+        * There can be sequence of objects to destroy. Therefore this request
+        * may have multiple transaction involved in. It is OK, we need only
+        * the highest used transno to be reported back in reply but not for
+        * replays, they must report their transno
+        */
+       if (fti->fti_transno == 0) /* not replay */
+               fti->fti_mult_trans = 1;
+
+       CDEBUG(D_HA, "%s: Destroy object "DOSTID" count %d\n", ofd_name(ofd),
+              POSTID(&body->oa.o_oi), count);
+       while (count > 0) {
+               int lrc;
+
+               lrc = ostid_to_fid(&fti->fti_fid, &repbody->oa.o_oi, 0);
+               if (lrc != 0) {
+                       if (rc == 0)
+                               rc = lrc;
+                       GOTO(out, rc);
+               }
+               lrc = ofd_destroy_by_fid(tsi->tsi_env, ofd, &fti->fti_fid, 0);
+               if (lrc == -ENOENT) {
+                       CDEBUG(D_INODE,
+                              "%s: destroying non-existent object "DFID"\n",
+                              ofd_name(ofd), PFID(&fti->fti_fid));
+                       /* rewrite rc with -ENOENT only if it is 0 */
+                       if (rc == 0)
+                               rc = lrc;
+               } else if (lrc != 0) {
+                       CERROR("%s: error destroying object "DFID": %d\n",
+                              ofd_name(ofd), PFID(&fti->fti_fid),
+                              rc);
+                       rc = lrc;
+               }
+               count--;
+               ostid_inc_id(&repbody->oa.o_oi);
+       }
+
+       /* if we have transaction then there were some deletions, we don't
+        * need to return ENOENT in that case because it will not wait
+        * for commit of these deletions. The ENOENT must be returned only
+        * if there were no transations.
+        */
+       if (rc == -ENOENT) {
+               if (fti->fti_transno != 0)
+                       rc = 0;
+       } else if (rc != 0) {
+               /*
+                * If we have at least one transaction then llog record
+                * on server will be removed upon commit, so for rc != 0
+                * we return no transno and llog record will be reprocessed.
+                */
+               fti->fti_transno = 0;
+       }
+       ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_DESTROY,
+                        tsi->tsi_jobid, 1);
+out:
+       RETURN(rc);
+}
+
+static int ofd_statfs_hdl(struct tgt_session_info *tsi)
+{
+       struct obd_statfs       *osfs;
+       int                      rc;
+
+       ENTRY;
+
+       osfs = req_capsule_server_get(tsi->tsi_pill, &RMF_OBD_STATFS);
+
+       rc = ofd_statfs(tsi->tsi_env, tsi->tsi_exp, osfs,
+                       cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS), 0);
+       if (rc != 0)
+               CERROR("%s: statfs failed: rc = %d\n",
+                      tgt_name(tsi->tsi_tgt), rc);
+
+       if (OBD_FAIL_CHECK(OBD_FAIL_OST_STATFS_EINPROGRESS))
+               rc = -EINPROGRESS;
+
+       ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_STATFS,
+                        tsi->tsi_jobid, 1);
+
+       RETURN(rc);
+}
+
+static int ofd_sync_hdl(struct tgt_session_info *tsi)
+{
+       struct ost_body         *body = tsi->tsi_ost_body;
+       struct ost_body         *repbody;
+       struct ofd_thread_info  *fti = tsi2ofd_info(tsi);
+       struct ofd_device       *ofd = ofd_exp(tsi->tsi_exp);
+       struct ofd_object       *fo = NULL;
+       int                      rc = 0;
+
+       ENTRY;
+
+       repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
+
+       /* if no objid is specified, it means "sync whole filesystem" */
+       if (!fid_is_zero(&tsi->tsi_fid)) {
+               fo = ofd_object_find_exists(tsi->tsi_env, ofd, &tsi->tsi_fid);
+               if (IS_ERR(fo))
+                       RETURN(PTR_ERR(fo));
+       }
+
+       rc = tgt_sync(tsi->tsi_env, tsi->tsi_tgt,
+                     fo != NULL ? ofd_object_child(fo) : NULL);
+       if (rc)
+               GOTO(put, rc);
+
+       ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_SYNC,
+                        tsi->tsi_jobid, 1);
+       if (fo == NULL)
+               RETURN(0);
+
+       repbody->oa.o_oi = body->oa.o_oi;
+       repbody->oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+
+       rc = ofd_attr_get(tsi->tsi_env, fo, &fti->fti_attr);
+       if (rc == 0)
+               obdo_from_la(&repbody->oa, &fti->fti_attr,
+                            OFD_VALID_FLAGS);
+       else
+               /* don't return rc from getattr */
+               rc = 0;
+       EXIT;
+put:
+       if (fo != NULL)
+               ofd_object_put(tsi->tsi_env, fo);
+       return rc;
+}
+
+static int ofd_punch_hdl(struct tgt_session_info *tsi)
+{
+       const struct obdo       *oa = &tsi->tsi_ost_body->oa;
+       struct ost_body         *repbody;
+       struct ofd_thread_info  *info = tsi2ofd_info(tsi);
+       struct ldlm_namespace   *ns = tsi->tsi_tgt->lut_obd->obd_namespace;
+       struct ldlm_resource    *res;
+       struct ofd_object       *fo;
+       struct filter_fid       *ff = NULL;
+       __u64                    flags = 0;
+       struct lustre_handle     lh = { 0, };
+       int                      rc;
+       __u64                    start, end;
+       bool                     srvlock;
+
+       ENTRY;
+
+       /* check that we do support OBD_CONNECT_TRUNCLOCK. */
+       CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK);
+
+       if ((oa->o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
+           (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
+               RETURN(err_serious(-EPROTO));
+
+       repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
+       if (repbody == NULL)
+               RETURN(err_serious(-ENOMEM));
+
+       /* punch start,end are passed in o_size,o_blocks throught wire */
+       start = oa->o_size;
+       end = oa->o_blocks;
+
+       if (end != OBD_OBJECT_EOF) /* Only truncate is supported */
+               RETURN(-EPROTO);
+
+       /* standard truncate optimization: if file body is completely
+        * destroyed, don't send data back to the server. */
+       if (start == 0)
+               flags |= LDLM_FL_AST_DISCARD_DATA;
+
+       repbody->oa.o_oi = oa->o_oi;
+       repbody->oa.o_valid = OBD_MD_FLID;
+
+       srvlock = oa->o_valid & OBD_MD_FLFLAGS &&
+                 oa->o_flags & OBD_FL_SRVLOCK;
+
+       if (srvlock) {
+               rc = tgt_extent_lock(ns, &tsi->tsi_resid, start, end, &lh,
+                                    LCK_PW, &flags);
+               if (rc != 0)
+                       RETURN(rc);
+       }
+
+       CDEBUG(D_INODE, "calling punch for object "DFID", valid = "LPX64
+              ", start = "LPD64", end = "LPD64"\n", PFID(&tsi->tsi_fid),
+              oa->o_valid, start, end);
+
+       fo = ofd_object_find_exists(tsi->tsi_env, ofd_exp(tsi->tsi_exp),
+                                   &tsi->tsi_fid);
+       if (IS_ERR(fo))
+               GOTO(out, rc = PTR_ERR(fo));
+
+       la_from_obdo(&info->fti_attr, oa,
+                    OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME);
+       info->fti_attr.la_size = start;
+       info->fti_attr.la_valid |= LA_SIZE;
+
+       if (oa->o_valid & OBD_MD_FLFID) {
+               ff = &info->fti_mds_fid;
+               ofd_prepare_fidea(ff, oa);
+       }
+
+       rc = ofd_object_punch(tsi->tsi_env, fo, start, end, &info->fti_attr,
+                             ff);
+       if (rc)
+               GOTO(out_put, rc);
+
+       ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_PUNCH,
+                        tsi->tsi_jobid, 1);
+       EXIT;
+out_put:
+       ofd_object_put(tsi->tsi_env, fo);
+out:
+       if (srvlock)
+               tgt_extent_unlock(&lh, LCK_PW);
+       if (rc == 0) {
+               /* we do not call this before to avoid lu_object_find() in
+                *  ->lvbo_update() holding another reference on the object.
+                * otherwise concurrent destroy can make the object unavailable
+                * for 2nd lu_object_find() waiting for the first reference
+                * to go... deadlock! */
+               res = ldlm_resource_get(ns, NULL, &tsi->tsi_resid,
+                                       LDLM_EXTENT, 0);
+               if (res != NULL) {
+                       ldlm_res_lvbo_update(res, NULL, 0);
+                       ldlm_resource_putref(res);
+               }
+       }
+       return rc;
+}
+
+
+static int ofd_quotactl(struct tgt_session_info *tsi)
+{
+       struct obd_quotactl     *oqctl, *repoqc;
+       int                      rc;
+
+       ENTRY;
+
+       oqctl = req_capsule_client_get(tsi->tsi_pill, &RMF_OBD_QUOTACTL);
+       if (oqctl == NULL)
+               RETURN(err_serious(-EPROTO));
+
+       repoqc = req_capsule_server_get(tsi->tsi_pill, &RMF_OBD_QUOTACTL);
+       if (repoqc == NULL)
+               RETURN(err_serious(-ENOMEM));
+
+       /* report success for quota on/off for interoperability with current MDT
+        * stack */
+       if (oqctl->qc_cmd == Q_QUOTAON || oqctl->qc_cmd == Q_QUOTAOFF)
+               RETURN(0);
+
+       *repoqc = *oqctl;
+       rc = lquotactl_slv(tsi->tsi_env, tsi->tsi_tgt->lut_bottom, repoqc);
+
+       ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_QUOTACTL,
+                        tsi->tsi_jobid, 1);
+
+       RETURN(rc);
+}
+
+#define OBD_FAIL_OST_READ_NET  OBD_FAIL_OST_BRW_NET
+#define OBD_FAIL_OST_WRITE_NET OBD_FAIL_OST_BRW_NET
+#define OST_BRW_READ   OST_READ
+#define OST_BRW_WRITE  OST_WRITE
+
+static struct tgt_handler ofd_tgt_handlers[] = {
+TGT_RPC_HANDLER(OST_FIRST_OPC,
+               0,                      OST_CONNECT,    tgt_connect,
+               &RQF_CONNECT, LUSTRE_OBD_VERSION),
+TGT_RPC_HANDLER(OST_FIRST_OPC,
+               0,                      OST_DISCONNECT, tgt_disconnect,
+               &RQF_OST_DISCONNECT, LUSTRE_OBD_VERSION),
+TGT_RPC_HANDLER(OST_FIRST_OPC,
+               0,                      OST_SET_INFO,   ofd_set_info_hdl,
+               &RQF_OBD_SET_INFO, LUSTRE_OST_VERSION),
+TGT_OST_HDL(0,                         OST_GET_INFO,   ofd_get_info_hdl),
+TGT_OST_HDL(HABEO_CORPUS| HABEO_REFERO,        OST_GETATTR,    ofd_getattr_hdl),
+TGT_OST_HDL(HABEO_CORPUS| HABEO_REFERO | MUTABOR,
+                                       OST_SETATTR,    ofd_setattr_hdl),
+TGT_OST_HDL(0          | HABEO_REFERO | MUTABOR,
+                                       OST_CREATE,     ofd_create_hdl),
+TGT_OST_HDL(0          | HABEO_REFERO | MUTABOR,
+                                       OST_DESTROY,    ofd_destroy_hdl),
+TGT_OST_HDL(0          | HABEO_REFERO, OST_STATFS,     ofd_statfs_hdl),
+TGT_OST_HDL(HABEO_CORPUS| HABEO_REFERO,        OST_BRW_READ,   tgt_brw_read),
+/* don't set CORPUS flag for brw_write because -ENOENT may be valid case */
+TGT_OST_HDL(MUTABOR,                   OST_BRW_WRITE,  tgt_brw_write),
+TGT_OST_HDL(HABEO_CORPUS| HABEO_REFERO | MUTABOR,
+                                       OST_PUNCH,      ofd_punch_hdl),
+TGT_OST_HDL(HABEO_CORPUS| HABEO_REFERO,        OST_SYNC,       ofd_sync_hdl),
+TGT_OST_HDL(0          | HABEO_REFERO, OST_QUOTACTL,   ofd_quotactl),
+};
+
 static struct tgt_opc_slice ofd_common_slice[] = {
        {
-               .tos_opc_start = UPDATE_OBJ,
-               .tos_opc_end   = UPDATE_LAST_OPC,
-               .tos_hs        = tgt_out_handlers
+               .tos_opc_start  = OST_FIRST_OPC,
+               .tos_opc_end    = OST_LAST_OPC,
+               .tos_hs         = ofd_tgt_handlers
+       },
+       {
+               .tos_opc_start  = OBD_FIRST_OPC,
+               .tos_opc_end    = OBD_LAST_OPC,
+               .tos_hs         = tgt_obd_handlers
+       },
+       {
+               .tos_opc_start  = LDLM_FIRST_OPC,
+               .tos_opc_end    = LDLM_LAST_OPC,
+               .tos_hs         = tgt_dlm_handlers
+       },
+       {
+               .tos_opc_start  = UPDATE_OBJ,
+               .tos_opc_end    = UPDATE_LAST_OPC,
+               .tos_hs         = tgt_out_handlers
        },
        {
                .tos_opc_start  = SEQ_FIRST_OPC,
index 5d1b880..1667d0d 100644 (file)
@@ -72,37 +72,37 @@ struct ofd_mod_data {
 #define OFD_FMD_MAX_NUM_DEFAULT 128
 #define OFD_FMD_MAX_AGE_DEFAULT ((obd_timeout + 10) * HZ)
 
-enum {
-       LPROC_OFD_READ_BYTES = 0,
-       LPROC_OFD_WRITE_BYTES = 1,
-       LPROC_OFD_LAST,
-};
-
-/* for job stats */
+/* request stats */
 enum {
        LPROC_OFD_STATS_READ = 0,
-       LPROC_OFD_STATS_WRITE = 1,
-       LPROC_OFD_STATS_SETATTR = 2,
-       LPROC_OFD_STATS_PUNCH = 3,
-       LPROC_OFD_STATS_SYNC = 4,
+       LPROC_OFD_STATS_WRITE,
+       LPROC_OFD_STATS_GETATTR,
+       LPROC_OFD_STATS_SETATTR,
+       LPROC_OFD_STATS_PUNCH,
+       LPROC_OFD_STATS_SYNC,
+       LPROC_OFD_STATS_DESTROY,
+       LPROC_OFD_STATS_CREATE,
+       LPROC_OFD_STATS_STATFS,
+       LPROC_OFD_STATS_GET_INFO,
+       LPROC_OFD_STATS_SET_INFO,
+       LPROC_OFD_STATS_QUOTACTL,
        LPROC_OFD_STATS_LAST,
 };
 
 static inline void ofd_counter_incr(struct obd_export *exp, int opcode,
                                    char *jobid, long amount)
 {
+       if (exp->exp_obd && exp->exp_obd->obd_stats)
+               lprocfs_counter_add(exp->exp_obd->obd_stats, opcode, amount);
+
        if (exp->exp_obd && exp->exp_obd->u.obt.obt_jobstats.ojs_hash &&
            (exp_connect_flags(exp) & OBD_CONNECT_JOBSTATS))
                lprocfs_job_stats_log(exp->exp_obd, jobid, opcode, amount);
 
        if (exp->exp_nid_stats != NULL &&
            exp->exp_nid_stats->nid_stats != NULL) {
-               if (opcode == LPROC_OFD_STATS_READ)
-                       lprocfs_counter_add(exp->exp_nid_stats->nid_stats,
-                                           LPROC_OFD_READ_BYTES, amount);
-               else if (opcode == LPROC_OFD_STATS_WRITE)
-                       lprocfs_counter_add(exp->exp_nid_stats->nid_stats,
-                                           LPROC_OFD_WRITE_BYTES, amount);
+               lprocfs_counter_add(exp->exp_nid_stats->nid_stats, opcode,
+                                   amount);
        }
 }
 
@@ -180,8 +180,6 @@ struct ofd_device {
        unsigned long            ofd_raid_degraded:1,
                                 /* sync journal on writes */
                                 ofd_syncjournal:1,
-                                /* sync on lock cancel */
-                                ofd_sync_lock_cancel:2,
                                 /* shall we grant space to clients not
                                  * supporting OBD_CONNECT_GRANT_PARAM? */
                                 ofd_grant_compat_disable:1;
@@ -338,6 +336,12 @@ extern struct obd_ops ofd_obd_ops;
 int ofd_statfs_internal(const struct lu_env *env, struct ofd_device *ofd,
                        struct obd_statfs *osfs, __u64 max_age,
                        int *from_cache);
+int ofd_orphans_destroy(const struct lu_env *env, struct obd_export *exp,
+                       struct ofd_device *ofd, struct obdo *oa);
+int ofd_destroy_by_fid(const struct lu_env *env, struct ofd_device *ofd,
+                      const struct lu_fid *fid, int orphan);
+int ofd_statfs(const struct lu_env *env,  struct obd_export *exp,
+              struct obd_statfs *osfs, __u64 max_age, __u32 flags);
 
 /* ofd_fs.c */
 obd_id ofd_seq_last_oid(struct ofd_seq *oseq);
@@ -415,6 +419,21 @@ int ofd_attr_get(const struct lu_env *env, struct ofd_object *fo,
 int ofd_attr_handle_ugid(const struct lu_env *env, struct ofd_object *fo,
                         struct lu_attr *la, int is_setattr);
 
+static inline
+struct ofd_object *ofd_object_find_exists(const struct lu_env *env,
+                                         struct ofd_device *ofd,
+                                         struct lu_fid *fid)
+{
+       struct ofd_object *fo;
+
+       fo = ofd_object_find(env, ofd, fid);
+       if (!IS_ERR(fo) && !ofd_object_exists(fo)) {
+               ofd_object_put(env, fo);
+               fo = ERR_PTR(-ENOENT);
+       }
+       return fo;
+}
+
 /* ofd_grants.c */
 #define OFD_GRANT_RATIO_SHIFT 8
 static inline __u64 ofd_grant_reserved(struct ofd_device *ofd, obd_size bavail)
@@ -499,24 +518,22 @@ int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp,
                      void *req_cookie, ldlm_mode_t mode, __u64 flags,
                      void *data);
 
-static inline struct ofd_thread_info * ofd_info(const struct lu_env *env)
+static inline struct ofd_thread_info *ofd_info(const struct lu_env *env)
 {
        struct ofd_thread_info *info;
 
+       lu_env_refill((void *)env);
        info = lu_context_key_get(&env->le_ctx, &ofd_thread_key);
        LASSERT(info);
-       LASSERT(info->fti_env);
-       LASSERT(info->fti_env == env);
        return info;
 }
 
-static inline struct ofd_thread_info * ofd_info_init(const struct lu_env *env,
-                                                    struct obd_export *exp)
+static inline struct ofd_thread_info *ofd_info_init(const struct lu_env *env,
+                                                   struct obd_export *exp)
 {
        struct ofd_thread_info *info;
 
-       info = lu_context_key_get(&env->le_ctx, &ofd_thread_key);
-       LASSERT(info);
+       info = ofd_info(env);
        LASSERT(info->fti_exp == NULL);
        LASSERT(info->fti_env == NULL);
        LASSERT(info->fti_attr.la_valid == 0);
@@ -529,6 +546,32 @@ static inline struct ofd_thread_info * ofd_info_init(const struct lu_env *env,
        return info;
 }
 
+static inline struct ofd_thread_info *tsi2ofd_info(struct tgt_session_info *tsi)
+{
+       struct ptlrpc_request   *req = tgt_ses_req(tsi);
+       struct ofd_thread_info  *info;
+
+       info = ofd_info(tsi->tsi_env);
+       LASSERT(info->fti_exp == NULL);
+       LASSERT(info->fti_env == NULL);
+       LASSERT(info->fti_attr.la_valid == 0);
+
+       info->fti_env = tsi->tsi_env;
+       info->fti_exp = tsi->tsi_exp;
+       info->fti_has_trans = 0;
+
+       info->fti_xid = req->rq_xid;
+       /** VBR: take versions from request */
+       if (req->rq_reqmsg != NULL &&
+           lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+               __u64 *pre_version = lustre_msg_get_versions(req->rq_reqmsg);
+
+               info->fti_pre_version = pre_version ? pre_version[0] : 0;
+               info->fti_transno = lustre_msg_get_transno(req->rq_reqmsg);
+       }
+       return info;
+}
+
 static inline void ofd_oti2info(struct ofd_thread_info *info,
                                struct obd_trans_info *oti)
 {
@@ -555,15 +598,14 @@ static inline void ofd_info2oti(struct ofd_thread_info *info,
 static inline void ofd_slc_set(struct ofd_device *ofd)
 {
        if (ofd->ofd_syncjournal == 1)
-               ofd->ofd_sync_lock_cancel = NEVER_SYNC_ON_CANCEL;
-       else if (ofd->ofd_sync_lock_cancel == NEVER_SYNC_ON_CANCEL)
-               ofd->ofd_sync_lock_cancel = ALWAYS_SYNC_ON_CANCEL;
+               ofd->ofd_lut.lut_sync_lock_cancel = NEVER_SYNC_ON_CANCEL;
+       else if (ofd->ofd_lut.lut_sync_lock_cancel == NEVER_SYNC_ON_CANCEL)
+               ofd->ofd_lut.lut_sync_lock_cancel = ALWAYS_SYNC_ON_CANCEL;
 }
 
-static inline void ofd_prepare_fidea(struct filter_fid *ff, struct obdo *oa)
+static inline void ofd_prepare_fidea(struct filter_fid *ff,
+                                    const struct obdo *oa)
 {
-       if (!(oa->o_valid & OBD_MD_FLGROUP))
-               ostid_set_seq_mdt0(&oa->o_oi);
        /* packing fid and converting it to LE for storing into EA.
         * Here ->o_stripe_idx should be filled by LOV and rest of
         * fields - by client. */
index b90bc8a..a67e6c7 100644 (file)
@@ -46,8 +46,7 @@ static int ofd_preprw_read(const struct lu_env *env, struct obd_export *exp,
                           struct ofd_device *ofd, struct lu_fid *fid,
                           struct lu_attr *la, int niocount,
                           struct niobuf_remote *rnb, int *nr_local,
-                          struct niobuf_local *lnb,
-                          struct obd_trans_info *oti)
+                          struct niobuf_local *lnb, char *jobid)
 {
        struct ofd_object       *fo;
        int                      i, j, rc, tot_bytes = 0;
@@ -88,10 +87,8 @@ static int ofd_preprw_read(const struct lu_env *env, struct obd_export *exp,
        rc = dt_read_prep(env, ofd_object_child(fo), lnb, *nr_local);
        if (unlikely(rc))
                GOTO(buf_put, rc);
-       lprocfs_counter_add(ofd_obd(ofd)->obd_stats,
-                           LPROC_OFD_READ_BYTES, tot_bytes);
-       ofd_counter_incr(exp, LPROC_OFD_STATS_READ,
-                        oti->oti_jobid, tot_bytes);
+
+       ofd_counter_incr(exp, LPROC_OFD_STATS_READ, jobid, tot_bytes);
        RETURN(0);
 
 buf_put:
@@ -107,8 +104,7 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp,
                            struct lu_attr *la, struct obdo *oa,
                            int objcount, struct obd_ioobj *obj,
                            struct niobuf_remote *rnb, int *nr_local,
-                           struct niobuf_local *lnb,
-                           struct obd_trans_info *oti)
+                           struct niobuf_local *lnb, char *jobid)
 {
        struct ofd_object       *fo;
        int                      i, j, k, rc = 0, tot_bytes = 0;
@@ -152,9 +148,6 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp,
                GOTO(out, rc = -ENOENT);
        }
 
-       /* Always sync if syncjournal parameter is set */
-       oti->oti_sync_write = ofd->ofd_syncjournal;
-
        /* Process incoming grant info, set OBD_BRW_GRANTED flag and grant some
         * space back if possible */
        ofd_grant_prepare_write(env, exp, oa, rnb, obj->ioo_bufcnt);
@@ -173,8 +166,6 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp,
                        lnb[j+k].lnb_flags = rnb[i].rnb_flags;
                        if (!(rnb[i].rnb_flags & OBD_BRW_GRANTED))
                                lnb[j+k].lnb_rc = -ENOSPC;
-                       if (!(rnb[i].rnb_flags & OBD_BRW_ASYNC))
-                               oti->oti_sync_write = 1;
                        /* remote client can't break through quota */
                        if (exp_connect_rmtclient(exp))
                                lnb[j+k].lnb_flags &= ~OBD_BRW_NOQUOTA;
@@ -190,10 +181,7 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp,
        if (unlikely(rc != 0))
                GOTO(err, rc);
 
-       lprocfs_counter_add(ofd_obd(ofd)->obd_stats,
-                           LPROC_OFD_WRITE_BYTES, tot_bytes);
-       ofd_counter_incr(exp, LPROC_OFD_STATS_WRITE,
-                        oti->oti_jobid, tot_bytes);
+       ofd_counter_incr(exp, LPROC_OFD_STATS_WRITE, jobid, tot_bytes);
        RETURN(0);
 err:
        dt_bufs_put(env, ofd_object_child(fo), lnb, *nr_local);
@@ -208,14 +196,16 @@ out:
        return rc;
 }
 
-int ofd_preprw(const struct lu_envenv, int cmd, struct obd_export *exp,
+int ofd_preprw(const struct lu_env *env, int cmd, struct obd_export *exp,
               struct obdo *oa, int objcount, struct obd_ioobj *obj,
               struct niobuf_remote *rnb, int *nr_local,
               struct niobuf_local *lnb, struct obd_trans_info *oti,
               struct lustre_capa *capa)
 {
+       struct tgt_session_info *tsi = tgt_ses_info(env);
        struct ofd_device       *ofd = ofd_exp(exp);
        struct ofd_thread_info  *info;
+       char                    *jobid;
        int                      rc = 0;
 
        if (*nr_local > PTLRPC_MAX_BRW_PAGES) {
@@ -225,14 +215,22 @@ int ofd_preprw(const struct lu_env* env, int cmd, struct obd_export *exp,
                RETURN(-EPROTO);
        }
 
-       rc = lu_env_refill((struct lu_env *)env);
-       LASSERT(rc == 0);
-       info = ofd_info_init(env, exp);
+       if (tgt_ses_req(tsi) == NULL) { /* echo client case */
+               LASSERT(oti != NULL);
+               lu_env_refill((struct lu_env *)env);
+               info = ofd_info_init(env, exp);
+               ofd_oti2info(info, oti);
+               jobid = oti->oti_jobid;
+       } else {
+               info = tsi2ofd_info(tsi);
+               jobid = tsi->tsi_jobid;
+       }
 
        LASSERT(oa != NULL);
 
        if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT)) {
                struct ofd_seq          *oseq;
+
                oseq = ofd_seq_load(env, ofd, ostid_seq(&oa->o_oi));
                if (IS_ERR(oseq)) {
                        CERROR("%s: Can not find seq for "DOSTID
@@ -265,7 +263,7 @@ int ofd_preprw(const struct lu_env* env, int cmd, struct obd_export *exp,
                        la_from_obdo(&info->fti_attr, oa, OBD_MD_FLGETATTR);
                        rc = ofd_preprw_write(env, exp, ofd, &info->fti_fid,
                                              &info->fti_attr, oa, objcount,
-                                             obj, rnb, nr_local, lnb, oti);
+                                             obj, rnb, nr_local, lnb, jobid);
                }
        } else if (cmd == OBD_BRW_READ) {
                rc = ofd_auth_capa(exp, &info->fti_fid, ostid_seq(&oa->o_oi),
@@ -274,7 +272,7 @@ int ofd_preprw(const struct lu_env* env, int cmd, struct obd_export *exp,
                        ofd_grant_prepare_read(env, exp, oa);
                        rc = ofd_preprw_read(env, exp, ofd, &info->fti_fid,
                                             &info->fti_attr, obj->ioo_bufcnt,
-                                            rnb, nr_local, lnb, oti);
+                                            rnb, nr_local, lnb, jobid);
                        obdo_from_la(oa, &info->fti_attr, LA_ATIME);
                }
        } else {
@@ -401,8 +399,7 @@ static int
 ofd_commitrw_write(const struct lu_env *env, struct ofd_device *ofd,
                   struct lu_fid *fid, struct lu_attr *la,
                   struct filter_fid *ff, int objcount,
-                  int niocount, struct niobuf_local *lnb,
-                  struct obd_trans_info *oti, int old_rc)
+                  int niocount, struct niobuf_local *lnb, int old_rc)
 {
        struct ofd_thread_info  *info = ofd_info(env);
        struct ofd_object       *fo;
@@ -410,6 +407,7 @@ ofd_commitrw_write(const struct lu_env *env, struct ofd_device *ofd,
        struct thandle          *th;
        int                      rc = 0;
        int                      retries = 0;
+       int                      i;
 
        ENTRY;
 
@@ -442,7 +440,15 @@ retry:
        if (IS_ERR(th))
                GOTO(out, rc = PTR_ERR(th));
 
-       th->th_sync |= oti->oti_sync_write;
+       th->th_sync |= ofd->ofd_syncjournal;
+       if (th->th_sync == 0) {
+               for (i = 0; i < niocount; i++) {
+                       if (!(lnb[i].lnb_flags & OBD_BRW_ASYNC)) {
+                               th->th_sync = 1;
+                               break;
+                       }
+               }
+       }
 
        if (OBD_FAIL_CHECK(OBD_FAIL_OST_DQACQ_NET))
                GOTO(out_stop, rc = -EINPROGRESS);
@@ -504,16 +510,13 @@ int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
                 struct niobuf_local *lnb, struct obd_trans_info *oti,
                 int old_rc)
 {
-       struct ofd_thread_info  *info;
+       struct ofd_thread_info  *info = ofd_info(env);
        struct ofd_mod_data     *fmd;
        __u64                    valid;
        struct ofd_device       *ofd = ofd_exp(exp);
        struct filter_fid       *ff = NULL;
        int                      rc = 0;
 
-       info = ofd_info(env);
-       ofd_oti2info(info, oti);
-
        LASSERT(npages > 0);
 
        rc = ostid_to_fid(&info->fti_fid, &oa->o_oi, 0);
@@ -542,7 +545,7 @@ int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
 
                rc = ofd_commitrw_write(env, ofd, &info->fti_fid,
                                        &info->fti_attr, ff, objcount, npages,
-                                       lnb, oti, old_rc);
+                                       lnb, old_rc);
                if (rc == 0)
                        obdo_from_la(oa, &info->fti_attr,
                                     OFD_VALID_FLAGS | LA_GID | LA_UID);
@@ -588,7 +591,7 @@ int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
                        }
                }
                rc = ofd_commitrw_read(env, ofd, &info->fti_fid, objcount,
-                                         npages, lnb);
+                                      npages, lnb);
                if (old_rc)
                        rc = old_rc;
        } else {
@@ -596,6 +599,7 @@ int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
                rc = -EPROTO;
        }
 
-       ofd_info2oti(info, oti);
+       if (oti != NULL)
+               ofd_info2oti(info, oti);
        RETURN(rc);
 }
index 8bcbd06..d1dffb5 100644 (file)
@@ -79,18 +79,15 @@ static int ofd_export_stats_init(struct ofd_device *ofd,
        stats = exp->exp_nid_stats;
        LASSERT(stats != NULL);
 
-       num_stats = NUM_OBD_STATS + LPROC_OFD_LAST;
+       num_stats = NUM_OBD_STATS + LPROC_OFD_STATS_LAST;
+
        stats->nid_stats = lprocfs_alloc_stats(num_stats,
                                               LPROCFS_STATS_FLAG_NOPERCPU);
        if (stats->nid_stats == NULL)
                return -ENOMEM;
 
-       lprocfs_init_ops_stats(LPROC_OFD_LAST, stats->nid_stats);
-       lprocfs_counter_init(stats->nid_stats, LPROC_OFD_READ_BYTES,
-                            LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes");
-       lprocfs_counter_init(stats->nid_stats, LPROC_OFD_WRITE_BYTES,
-                            LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes");
-
+       lprocfs_init_ops_stats(LPROC_OFD_STATS_LAST, stats->nid_stats);
+       ofd_stats_counter_init(stats->nid_stats);
        rc = lprocfs_register_stats(stats->nid_proc, "stats",
                                    stats->nid_stats);
        if (rc)
@@ -255,13 +252,6 @@ static int ofd_obd_reconnect(const struct lu_env *env, struct obd_export *exp,
 
        ofd = ofd_dev(obd->obd_lu_dev);
 
-       rc = lu_env_refill((struct lu_env *)env);
-       if (rc != 0) {
-               CERROR("Failure to refill session: '%d'\n", rc);
-               RETURN(rc);
-       }
-
-       ofd_info_init(env, exp);
        rc = ofd_parse_connect_data(env, exp, data, false);
        if (rc == 0)
                ofd_export_stats_init(ofd, exp, localdata);
@@ -291,14 +281,6 @@ static int ofd_obd_connect(const struct lu_env *env, struct obd_export **_exp,
        exp = class_conn2export(&conn);
        LASSERT(exp != NULL);
 
-       rc = lu_env_refill((struct lu_env *)env);
-       if (rc != 0) {
-               CERROR("Failure to refill session: '%d'\n", rc);
-               GOTO(out, rc);
-       }
-
-       ofd_info_init(env, exp);
-
        rc = ofd_parse_connect_data(env, exp, data, true);
        if (rc)
                GOTO(out, rc);
@@ -345,16 +327,17 @@ static int ofd_obd_disconnect(struct obd_export *exp)
 
        ofd_grant_discard(exp);
 
-       rc = lu_env_init(&env, LCT_DT_THREAD);
-       if (rc)
-               RETURN(rc);
-
        /* Do not erase record for recoverable client. */
        if (exp->exp_obd->obd_replayable &&
-           (!exp->exp_obd->obd_fail || exp->exp_failed))
-               tgt_client_del(&env, exp);
-       lu_env_fini(&env);
+           (!exp->exp_obd->obd_fail || exp->exp_failed)) {
+               rc = lu_env_init(&env, LCT_DT_THREAD);
+               if (rc)
+                       GOTO(out, rc);
 
+               tgt_client_del(&env, exp);
+               lu_env_fini(&env);
+       }
+out:
        class_export_put(exp);
        RETURN(rc);
 }
@@ -624,9 +607,6 @@ static int ofd_get_info(const struct lu_env *env, struct obd_export *exp,
                        ofd_read_unlock(env, fo);
                        ofd_object_put(env, fo);
                }
-       } else if (KEY_IS(KEY_SYNC_LOCK_CANCEL)) {
-               *((__u32 *) val) = ofd->ofd_sync_lock_cancel;
-               *vallen = sizeof(__u32);
        } else if (KEY_IS(KEY_LAST_FID)) {
                struct ofd_device       *ofd = ofd_exp(exp);
                struct ofd_seq          *oseq;
@@ -749,8 +729,8 @@ int ofd_statfs_internal(const struct lu_env *env, struct ofd_device *ofd,
        return 0;
 }
 
-static int ofd_statfs(const struct lu_env *env,  struct obd_export *exp,
-                     struct obd_statfs *osfs, __u64 max_age, __u32 flags)
+int ofd_statfs(const struct lu_env *env,  struct obd_export *exp,
+              struct obd_statfs *osfs, __u64 max_age, __u32 flags)
 {
         struct obd_device      *obd = class_exp2obd(exp);
        struct ofd_device       *ofd = ofd_dev(exp->exp_obd->obd_lu_dev);
@@ -996,9 +976,8 @@ out_env:
        return rc;
 }
 
-static int ofd_destroy_by_fid(const struct lu_env *env,
-                             struct ofd_device *ofd,
-                             const struct lu_fid *fid, int orphan)
+int ofd_destroy_by_fid(const struct lu_env *env, struct ofd_device *ofd,
+                      const struct lu_fid *fid, int orphan)
 {
        struct ofd_thread_info  *info = ofd_info(env);
        struct lustre_handle     lockh;
@@ -1121,9 +1100,8 @@ out:
        RETURN(rc);
 }
 
-static int ofd_orphans_destroy(const struct lu_env *env,
-                              struct obd_export *exp, struct ofd_device *ofd,
-                              struct obdo *oa)
+int ofd_orphans_destroy(const struct lu_env *env, struct obd_export *exp,
+                       struct ofd_device *ofd, struct obdo *oa)
 {
        struct ofd_thread_info  *info = ofd_info(env);
        obd_id                   last;
index 9103704..a14afe2 100644 (file)
@@ -67,7 +67,7 @@ int ofd_trans_start(const struct lu_env *env, struct ofd_device *ofd,
        struct ofd_thread_info  *info = ofd_info(env);
        int                      rc;
 
-       if (info->fti_exp == NULL)
+       if (env->le_ses == NULL || info->fti_exp == NULL)
                return 0;
 
        /* declare last_rcvd update */
@@ -102,109 +102,27 @@ void ofd_trans_stop(const struct lu_env *env, struct ofd_device *ofd,
        dt_trans_stop(env, ofd->ofd_osd, th);
 }
 
-/*
- * last_rcvd & last_committed update callbacks
- */
-static int ofd_last_rcvd_update(struct ofd_thread_info *info,
-                               struct thandle *th)
-{
-       struct ofd_device               *ofd = ofd_exp(info->fti_exp);
-       struct filter_export_data       *fed;
-       struct lsd_client_data          *lcd;
-       __s32                            rc = th->th_result;
-       __u64                           *transno_p;
-       loff_t                           off;
-       int                              err;
-       bool                             lw_client = false;
-
-       ENTRY;
-
-       LASSERT(ofd);
-       LASSERT(info->fti_exp);
-
-       if (exp_connect_flags(info->fti_exp) & OBD_CONNECT_LIGHTWEIGHT)
-               lw_client = true;
-
-       fed = &info->fti_exp->exp_filter_data;
-       LASSERT(fed);
-       lcd = fed->fed_ted.ted_lcd;
-       /* if the export has already been disconnected, we have no last_rcvd
-        * slot, update server data with latest transno then */
-       if (lcd == NULL) {
-               CWARN("commit transaction for disconnected client %s: rc %d\n",
-                     info->fti_exp->exp_client_uuid.uuid, rc);
-               err = tgt_server_data_write(info->fti_env, &ofd->ofd_lut, th);
-               RETURN(err);
-       }
-       /* ofd connect may cause transaction before export has last_rcvd
-        * slot */
-       if (fed->fed_ted.ted_lr_idx < 0 && !lw_client)
-               RETURN(0);
-       off = fed->fed_ted.ted_lr_off;
-
-       transno_p = &lcd->lcd_last_transno;
-       lcd->lcd_last_xid = info->fti_xid;
-
-       /*
-        * When we store zero transno in mcd we can lost last transno value
-        * because mcd contains 0, but msd is not yet written
-        * The server data should be updated also if the latest
-        * transno is rewritten by zero. See the bug 11125 for details.
-        */
-       if (info->fti_transno == 0 &&
-           *transno_p == ofd->ofd_lut.lut_last_transno) {
-               spin_lock(&ofd->ofd_lut.lut_translock);
-               ofd->ofd_lut.lut_lsd.lsd_last_transno =
-                                               ofd->ofd_lut.lut_last_transno;
-               spin_unlock(&ofd->ofd_lut.lut_translock);
-               tgt_server_data_write(info->fti_env, &ofd->ofd_lut, th);
-       }
-
-       *transno_p = info->fti_transno;
-       if (lw_client) {
-               /* Although lightweight (LW) connections have no slot in
-                * last_rcvd, we still want to maintain the in-memory
-                * lsd_client_data structure in order to properly handle reply
-                * reconstruction. */
-               struct lu_target        *tg =&ofd->ofd_lut;
-               bool                     update = false;
-
-               err = 0;
-               /* All operations performed by LW clients are synchronous and
-                * we store the committed transno in the last_rcvd header */
-               spin_lock(&tg->lut_translock);
-               if (info->fti_transno > tg->lut_lsd.lsd_last_transno) {
-                       tg->lut_lsd.lsd_last_transno = info->fti_transno;
-                       update = true;
-               }
-               spin_unlock(&tg->lut_translock);
-               if (update)
-                       err = tgt_server_data_write(info->fti_env, tg, th);
-       } else {
-               LASSERT(fed->fed_ted.ted_lr_off > 0);
-               err = tgt_client_data_write(info->fti_env, &ofd->ofd_lut, lcd,
-                                   &off, th);
-       }
-
-       RETURN(err);
-}
-
 /* Update last_rcvd records with the latest transaction data */
 int ofd_txn_stop_cb(const struct lu_env *env, struct thandle *txn,
                    void *cookie)
 {
-       struct ofd_device *ofd = cookie;
-       struct ofd_thread_info *info;
+       struct ofd_device       *ofd = cookie;
+       struct ofd_thread_info  *info = ofd_info(env);
+       struct dt_object        *obj;
+       struct tgt_session_info *tsi;
+       bool                     echo_client;
+       int                      rc;
 
        ENTRY;
 
-       info = lu_context_key_get(&env->le_ctx, &ofd_thread_key);
+       if (env->le_ses == NULL || info->fti_exp == NULL)
+               RETURN(0);
+
+       tsi = tgt_ses_info(env);
 
-       if (info->fti_exp == NULL)
-                RETURN(0);
+       echo_client = (tgt_ses_req(tsi) == NULL);
 
-       LASSERT(ofd_exp(info->fti_exp) == ofd);
-       if (info->fti_has_trans) {
+       if (info->fti_has_trans && !echo_client) {
                if (info->fti_mult_trans == 0) {
                        CERROR("More than one transaction "LPU64"\n",
                               info->fti_transno);
@@ -216,38 +134,18 @@ int ofd_txn_stop_cb(const struct lu_env *env, struct thandle *txn,
                info->fti_has_trans = 1;
        }
 
-       spin_lock(&ofd->ofd_lut.lut_translock);
-       if (txn->th_result != 0) {
-               if (info->fti_transno != 0) {
-                       CERROR("Replay transno "LPU64" failed: rc %d\n",
-                              info->fti_transno, txn->th_result);
-                       info->fti_transno = 0;
-               }
-       } else if (info->fti_transno == 0) {
-               info->fti_transno = ++ofd->ofd_lut.lut_last_transno;
-       } else {
-               /* should be replay */
-               if (info->fti_transno > ofd->ofd_lut.lut_last_transno)
-                       ofd->ofd_lut.lut_last_transno = info->fti_transno;
-       }
-       spin_unlock(&ofd->ofd_lut.lut_translock);
-
        /** VBR: set new versions */
-       if (txn->th_result == 0 && info->fti_obj != NULL) {
-               dt_version_set(env, ofd_object_child(info->fti_obj),
-                              info->fti_transno, txn);
-               info->fti_obj = NULL;
-       }
-
-       /* filling reply data */
-       CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
-              info->fti_transno, ofd_obd(ofd)->obd_last_committed);
-
-       /* if can't add callback, do sync write */
-       txn->th_sync |= !!tgt_last_commit_cb_add(txn, &ofd->ofd_lut,
-                                                info->fti_exp,
-                                                info->fti_transno);
-
-       return ofd_last_rcvd_update(info, txn);
+       if (info->fti_obj != NULL)
+               obj = ofd_object_child(info->fti_obj);
+       else
+               obj = NULL;
+
+       if (unlikely(echo_client)) /* echo client special case */
+               rc = tgt_last_rcvd_update_echo(env, &ofd->ofd_lut, obj, txn,
+                                              tsi->tsi_exp);
+       else
+               rc = tgt_last_rcvd_update(env, &ofd->ofd_lut, obj, 0, txn,
+                                         tgt_ses_req(tsi));
+       RETURN(rc);
 }
 
index 5b8065a..20783fc 100644 (file)
@@ -539,24 +539,22 @@ static int osp_get_lastfid_from_ost(const struct lu_env *env,
        if (req == NULL)
                RETURN(-ENOMEM);
 
-       req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY, RCL_CLIENT,
+       req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_KEY, RCL_CLIENT,
                             sizeof(KEY_LAST_FID));
 
-       req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL, RCL_CLIENT,
-                            sizeof(struct lu_fid));
-
        rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
        if (rc) {
                ptlrpc_request_free(req);
                RETURN(rc);
        }
 
-       tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
+       tmp = req_capsule_client_get(&req->rq_pill, &RMF_GETINFO_KEY);
        memcpy(tmp, KEY_LAST_FID, sizeof(KEY_LAST_FID));
 
        req->rq_no_delay = req->rq_no_resend = 1;
-       tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
-       fid_cpu_to_le((struct lu_fid *)tmp, &d->opd_last_used_fid);
+       last_fid = req_capsule_client_get(&req->rq_pill, &RMF_FID);
+       fid_cpu_to_le(last_fid, &d->opd_last_used_fid);
+
        ptlrpc_request_set_replen(req);
 
        rc = ptlrpc_queue_wait(req);
index ae982d0..1adc42f 100644 (file)
 #define DEBUG_SUBSYSTEM S_OST
 
 #include <linux/module.h>
-#include <obd_cksum.h>
 #include <obd_ost.h>
-#include <lustre_net.h>
 #include <lustre_dlm.h>
-#include <lustre_export.h>
-#include <lustre_debug.h>
-#include <lustre_fid.h>
-#include <lustre_fld.h>
-#include <linux/init.h>
 #include <lprocfs_status.h>
-#include <libcfs/list.h>
-#include <lustre_quota.h>
-#include <lustre_fid.h>
 #include "ost_internal.h"
-#include <lustre_fid.h>
 
 static int oss_num_threads;
 CFS_MODULE_PARM(oss_num_threads, "i", int, 0444,
@@ -78,1808 +67,76 @@ static char *oss_io_cpts;
 CFS_MODULE_PARM(oss_io_cpts, "s", charp, 0444,
                "CPU partitions OSS IO threads should run on");
 
-/*
- * this page is allocated statically when module is initializing
- * it is used to simulate data corruptions, see ost_checksum_bulk()
- * for details. as the original pages provided by the layers below
- * can be remain in the internal cache, we do not want to modify
- * them.
- */
-static struct page *ost_page_to_corrupt = NULL;
-
-/**
- * Do not return server-side uid/gid to remote client
- */
-static void ost_drop_id(struct obd_export *exp, struct obdo *oa)
-{
-        if (exp_connect_rmtclient(exp)) {
-                oa->o_uid = -1;
-                oa->o_gid = -1;
-                oa->o_valid &= ~(OBD_MD_FLUID | OBD_MD_FLGID);
-        }
-}
-
-/**
- * Validate oa from client.
- * If the request comes from 2.0 clients, currently only RSVD seq and IDIF
- * req are valid.
- *    a. objects in Single MDT FS  seq = FID_SEQ_OST_MDT0, oi_id != 0
- *    b. Echo objects(seq = 2), old echo client still use oi_id/oi_seq to
- *       pack ost_id. Because non-zero oi_seq will make it diffcult to tell
- *       whether this is oi_fid or real ostid. So it will check
- *       OBD_CONNECT_FID, then convert the ostid to FID for old client.
- *    c. Old FID-disable osc will send IDIF.
- *    d. new FID-enable osc/osp will send normal FID.
- *
- * And also oi_id/f_oid should always start from 1. oi_id/f_oid = 0 will
- * be used for LAST_ID file, and only being accessed inside OST now.
- */
-static int ost_validate_obdo(struct obd_export *exp, struct obdo *oa,
-                            struct obd_ioobj *ioobj)
-{
-       int rc = 0;
-
-       if (unlikely(!(exp_connect_flags(exp) & OBD_CONNECT_FID) &&
-                    fid_seq_is_echo(oa->o_oi.oi.oi_seq) && oa != NULL)) {
-               /* Sigh 2.[123] client still sends echo req with oi_id = 0
-                * during create, and we will reset this to 1, since this
-                * oi_id is basically useless in the following create process,
-                * but oi_id == 0 will make it difficult to tell whether it is
-                * real FID or ost_id. */
-               oa->o_oi.oi_fid.f_oid = oa->o_oi.oi.oi_id ?: 1;
-               oa->o_oi.oi_fid.f_seq = FID_SEQ_ECHO;
-               oa->o_oi.oi_fid.f_ver = 0;
-       } else {
-               if (unlikely((oa == NULL) || ostid_id(&oa->o_oi) == 0))
-                       GOTO(out, rc = -EPROTO);
-
-               /* Note: this check might be forced in 2.5 or 2.6, i.e.
-                * all of the requests are required to setup FLGROUP */
-               if (unlikely(!(oa->o_valid & OBD_MD_FLGROUP))) {
-                       ostid_set_seq_mdt0(&oa->o_oi);
-                       if (ioobj)
-                               ostid_set_seq_mdt0(&ioobj->ioo_oid);
-                       oa->o_valid |= OBD_MD_FLGROUP;
-               }
-
-               if (unlikely(!(fid_seq_is_idif(ostid_seq(&oa->o_oi)) ||
-                              fid_seq_is_mdt0(ostid_seq(&oa->o_oi)) ||
-                              fid_seq_is_norm(ostid_seq(&oa->o_oi)) ||
-                              fid_seq_is_echo(ostid_seq(&oa->o_oi)))))
-                       GOTO(out, rc = -EPROTO);
-       }
-
-       if (ioobj != NULL) {
-               unsigned max_brw = ioobj_max_brw_get(ioobj);
-
-               if (unlikely((max_brw & (max_brw - 1)) != 0)) {
-                       CERROR("%s: client %s sent bad ioobj max %u for "DOSTID
-                              ": rc = -EPROTO\n", exp->exp_obd->obd_name,
-                              obd_export_nid2str(exp), max_brw,
-                              POSTID(&oa->o_oi));
-                       GOTO(out, rc = -EPROTO);
-               }
-               ioobj->ioo_oid = oa->o_oi;
-       }
-
-out:
-       if (rc != 0)
-               CERROR("%s: client %s sent bad object "DOSTID": rc = %d\n",
-                      exp->exp_obd->obd_name, obd_export_nid2str(exp),
-                      oa ? ostid_seq(&oa->o_oi) : -1,
-                      oa ? ostid_id(&oa->o_oi) : -1, rc);
-       return rc;
-}
-
-void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
-{
-        struct oti_req_ack_lock *ack_lock;
-        int i;
-
-        if (oti == NULL)
-                return;
-
-        if (req->rq_repmsg) {
-                __u64 versions[PTLRPC_NUM_VERSIONS] = { 0 };
-                lustre_msg_set_transno(req->rq_repmsg, oti->oti_transno);
-                versions[0] = oti->oti_pre_version;
-                lustre_msg_set_versions(req->rq_repmsg, versions);
-        }
-        req->rq_transno = oti->oti_transno;
-
-        /* XXX 4 == entries in oti_ack_locks??? */
-        for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
-                if (!ack_lock->mode)
-                        break;
-                /* XXX not even calling target_send_reply in some cases... */
-                ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode, 0);
-        }
-}
-
-static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req,
-                       struct obd_trans_info *oti)
-{
-        struct ost_body *body, *repbody;
-        struct lustre_capa *capa = NULL;
-        int rc;
-        ENTRY;
-
-        /* Get the request body */
-        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
-        if (body == NULL)
-                RETURN(-EFAULT);
-
-       if (ostid_id(&body->oa.o_oi) == 0)
-               RETURN(-EPROTO);
-
-        rc = ost_validate_obdo(exp, &body->oa, NULL);
-        if (rc)
-                RETURN(rc);
-
-        /* If there's a DLM request, cancel the locks mentioned in it*/
-        if (req_capsule_field_present(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT)) {
-                struct ldlm_request *dlm;
-
-                dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
-                if (dlm == NULL)
-                        RETURN (-EFAULT);
-                ldlm_request_cancel(req, dlm, 0);
-        }
-
-        /* If there's a capability, get it */
-        if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
-                capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
-                if (capa == NULL) {
-                        CERROR("Missing capability for OST DESTROY");
-                        RETURN (-EFAULT);
-                }
-        }
-
-        /* Prepare the reply */
-        rc = req_capsule_server_pack(&req->rq_pill);
-        if (rc)
-                RETURN(rc);
-
-        /* Get the log cancellation cookie */
-        if (body->oa.o_valid & OBD_MD_FLCOOKIE)
-                oti->oti_logcookies = &body->oa.o_lcookie;
-
-        /* Finish the reply */
-        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
-        memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
-
-        /* Do the destroy and set the reply status accordingly  */
-        req->rq_status = obd_destroy(req->rq_svc_thread->t_env, exp,
-                                     &repbody->oa, NULL, oti, NULL, capa);
-        RETURN(0);
-}
-
-/**
- * Helper function for getting server side [start, start+count] DLM lock
- * if asked by client.
- */
-static int ost_lock_get(struct obd_export *exp, struct obdo *oa,
-                        __u64 start, __u64 count, struct lustre_handle *lh,
-                       int mode, __u64 flags)
-{
-        struct ldlm_res_id res_id;
-        ldlm_policy_data_t policy;
-        __u64 end = start + count;
-
-        ENTRY;
-
-        LASSERT(!lustre_handle_is_used(lh));
-        /* o_id and o_gr are used for localizing resource, if client miss to set
-         * them, do not trigger ASSERTION. */
-        if (unlikely((oa->o_valid & (OBD_MD_FLID | OBD_MD_FLGROUP)) !=
-                     (OBD_MD_FLID | OBD_MD_FLGROUP)))
-                RETURN(-EPROTO);
-
-        if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
-            !(oa->o_flags & OBD_FL_SRVLOCK))
-                RETURN(0);
-
-       if (mode == LCK_MINMODE)
-               RETURN(0);
-
-       ostid_build_res_name(&oa->o_oi, &res_id);
-        CDEBUG(D_INODE, "OST-side extent lock.\n");
-
-        policy.l_extent.start = start & CFS_PAGE_MASK;
-
-        /* If ->o_blocks is EOF it means "lock till the end of the
-         * file". Otherwise, it's size of a hole being punched (in bytes) */
-        if (count == OBD_OBJECT_EOF || end < start)
-                policy.l_extent.end = OBD_OBJECT_EOF;
-        else
-                policy.l_extent.end = end | ~CFS_PAGE_MASK;
-
-        RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
-                                      LDLM_EXTENT, &policy, mode, &flags,
-                                      ldlm_blocking_ast, ldlm_completion_ast,
-                                     ldlm_glimpse_ast, NULL, 0, LVB_T_NONE,
-                                     NULL, lh));
-}
-
-/* Helper function: release lock, if any. */
-static void ost_lock_put(struct obd_export *exp,
-                         struct lustre_handle *lh, int mode)
-{
-        ENTRY;
-        if (lustre_handle_is_used(lh))
-                ldlm_lock_decref(lh, mode);
-        EXIT;
-}
-
-static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
-{
-        struct ost_body *body, *repbody;
-        struct obd_info *oinfo;
-        struct lustre_handle lh = { 0 };
-        struct lustre_capa *capa = NULL;
-       ldlm_mode_t lock_mode;
-        int rc;
-        ENTRY;
-
-        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
-        if (body == NULL)
-                RETURN(-EFAULT);
-
-        rc = ost_validate_obdo(exp, &body->oa, NULL);
-        if (rc)
-                RETURN(rc);
-
-        if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
-                capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
-                if (capa == NULL) {
-                        CERROR("Missing capability for OST GETATTR");
-                        RETURN(-EFAULT);
-                }
-        }
-
-        rc = req_capsule_server_pack(&req->rq_pill);
-        if (rc)
-                RETURN(rc);
-
-        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
-        repbody->oa = body->oa;
-
-       lock_mode = LCK_MINMODE;
-       if (body->oa.o_valid & OBD_MD_FLFLAGS &&
-           body->oa.o_flags & OBD_FL_SRVLOCK) {
-               lock_mode = LCK_PR;
-               if (body->oa.o_flags & OBD_FL_FLUSH)
-                       lock_mode = LCK_PW;
-       }
-       rc = ost_lock_get(exp, &repbody->oa, 0, OBD_OBJECT_EOF, &lh,
-                         lock_mode, 0);
-       if (rc)
-               RETURN(rc);
-
-        OBD_ALLOC_PTR(oinfo);
-        if (!oinfo)
-                GOTO(unlock, rc = -ENOMEM);
-        oinfo->oi_oa = &repbody->oa;
-        oinfo->oi_capa = capa;
-
-        req->rq_status = obd_getattr(req->rq_svc_thread->t_env, exp, oinfo);
-
-        OBD_FREE_PTR(oinfo);
-
-        ost_drop_id(exp, &repbody->oa);
-
-       if (!(repbody->oa.o_valid & OBD_MD_FLFLAGS)) {
-               repbody->oa.o_valid |= OBD_MD_FLFLAGS;
-               repbody->oa.o_flags = 0;
-       }
-       repbody->oa.o_flags |= OBD_FL_FLUSH;
-
-unlock:
-       ost_lock_put(exp, &lh, lock_mode);
-       RETURN(rc);
-}
-
-static int ost_statfs(struct ptlrpc_request *req)
-{
-        struct obd_statfs *osfs;
-        int rc;
-        ENTRY;
-
-        rc = req_capsule_server_pack(&req->rq_pill);
-        if (rc)
-                RETURN(rc);
-
-        osfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
-
-        req->rq_status = obd_statfs(req->rq_svc_thread->t_env, req->rq_export,
-                                    osfs,
-                                    cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
-                                    0);
-        if (req->rq_status != 0)
-                CERROR("ost: statfs failed: rc %d\n", req->rq_status);
-
-       if (OBD_FAIL_CHECK(OBD_FAIL_OST_STATFS_EINPROGRESS))
-               req->rq_status = -EINPROGRESS;
-
-        RETURN(0);
-}
-
-static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
-                      struct obd_trans_info *oti)
-{
-        struct ost_body *body, *repbody;
-        int rc;
-        ENTRY;
-
-        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
-        if (body == NULL)
-                RETURN(-EFAULT);
-
-        rc = ost_validate_obdo(req->rq_export, &body->oa, NULL);
-        if (rc)
-                RETURN(rc);
-
-        rc = req_capsule_server_pack(&req->rq_pill);
-        if (rc)
-                RETURN(rc);
-
-        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
-        repbody->oa = body->oa;
-        oti->oti_logcookies = &body->oa.o_lcookie;
-
-        req->rq_status = obd_create(req->rq_svc_thread->t_env, exp,
-                                    &repbody->oa, NULL, oti);
-        //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
-        RETURN(0);
-}
-
-static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
-                     struct obd_trans_info *oti)
-{
-        struct ost_body *body, *repbody;
-       __u64 flags = 0;
-        struct lustre_handle lh = {0,};
-       int rc;
-        ENTRY;
-
-        /* check that we do support OBD_CONNECT_TRUNCLOCK. */
-        CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK);
-
-        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
-        if (body == NULL)
-                RETURN(-EFAULT);
-
-        rc = ost_validate_obdo(exp, &body->oa, NULL);
-        if (rc)
-                RETURN(rc);
-
-        if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
-            (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
-                RETURN(-EPROTO);
-
-        rc = req_capsule_server_pack(&req->rq_pill);
-        if (rc)
-                RETURN(rc);
-
-        /* standard truncate optimization: if file body is completely
-         * destroyed, don't send data back to the server. */
-        if (body->oa.o_size == 0)
-               flags |= LDLM_FL_AST_DISCARD_DATA;
-
-        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
-        repbody->oa = body->oa;
-
-        rc = ost_lock_get(exp, &repbody->oa, repbody->oa.o_size,
-                          repbody->oa.o_blocks, &lh, LCK_PW, flags);
-        if (rc == 0) {
-                struct obd_info *oinfo;
-                struct lustre_capa *capa = NULL;
-
-                if (repbody->oa.o_valid & OBD_MD_FLFLAGS &&
-                    repbody->oa.o_flags == OBD_FL_SRVLOCK)
-                        /*
-                         * If OBD_FL_SRVLOCK is the only bit set in
-                         * ->o_flags, clear OBD_MD_FLFLAGS to avoid falling
-                         * through filter_setattr() to filter_iocontrol().
-                         */
-                        repbody->oa.o_valid &= ~OBD_MD_FLFLAGS;
-
-                if (repbody->oa.o_valid & OBD_MD_FLOSSCAPA) {
-                        capa = req_capsule_client_get(&req->rq_pill,
-                                                      &RMF_CAPA1);
-                        if (capa == NULL) {
-                                CERROR("Missing capability for OST PUNCH");
-                                GOTO(unlock, rc = -EFAULT);
-                        }
-                }
-
-                OBD_ALLOC_PTR(oinfo);
-                if (!oinfo)
-                        GOTO(unlock, rc = -ENOMEM);
-                oinfo->oi_oa = &repbody->oa;
-                oinfo->oi_policy.l_extent.start = oinfo->oi_oa->o_size;
-                oinfo->oi_policy.l_extent.end = oinfo->oi_oa->o_blocks;
-                oinfo->oi_capa = capa;
-                oinfo->oi_flags = OBD_FL_PUNCH;
-
-                req->rq_status = obd_punch(req->rq_svc_thread->t_env, exp,
-                                           oinfo, oti, NULL);
-                OBD_FREE_PTR(oinfo);
-unlock:
-                ost_lock_put(exp, &lh, LCK_PW);
-        }
-
-        ost_drop_id(exp, &repbody->oa);
-        RETURN(rc);
-}
-
-static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req,
-                   struct obd_trans_info *oti)
-{
-        struct ost_body *body, *repbody;
-        struct obd_info *oinfo;
-        struct lustre_capa *capa = NULL;
-        int rc;
-        ENTRY;
-
-        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
-        if (body == NULL)
-                RETURN(-EFAULT);
-
-        rc = ost_validate_obdo(exp, &body->oa, NULL);
-        if (rc)
-                RETURN(rc);
-
-        if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
-                capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
-                if (capa == NULL) {
-                        CERROR("Missing capability for OST SYNC");
-                        RETURN (-EFAULT);
-                }
-        }
-
-        rc = req_capsule_server_pack(&req->rq_pill);
-        if (rc)
-                RETURN(rc);
-
-        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
-        repbody->oa = body->oa;
-
-        OBD_ALLOC_PTR(oinfo);
-        if (!oinfo)
-                RETURN(-ENOMEM);
-
-        oinfo->oi_oa = &repbody->oa;
-        oinfo->oi_capa = capa;
-       oinfo->oi_jobid = oti->oti_jobid;
-        req->rq_status = obd_sync(req->rq_svc_thread->t_env, exp, oinfo,
-                                  repbody->oa.o_size, repbody->oa.o_blocks,
-                                  NULL);
-        OBD_FREE_PTR(oinfo);
-
-        ost_drop_id(exp, &repbody->oa);
-        RETURN(0);
-}
-
-static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req,
-                       struct obd_trans_info *oti)
-{
-        struct ost_body *body, *repbody;
-        struct obd_info *oinfo;
-        struct lustre_capa *capa = NULL;
-        int rc;
-        ENTRY;
-
-        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
-        if (body == NULL)
-                RETURN(-EFAULT);
-
-        rc = ost_validate_obdo(req->rq_export, &body->oa, NULL);
-        if (rc)
-                RETURN(rc);
-
-        rc = req_capsule_server_pack(&req->rq_pill);
-        if (rc)
-                RETURN(rc);
-
-        if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
-                capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
-                if (capa == NULL) {
-                        CERROR("Missing capability for OST SETATTR");
-                        RETURN (-EFAULT);
-                }
-        }
-
-        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
-        repbody->oa = body->oa;
-
-        OBD_ALLOC_PTR(oinfo);
-        if (!oinfo)
-                RETURN(-ENOMEM);
-        oinfo->oi_oa = &repbody->oa;
-        oinfo->oi_capa = capa;
-
-        req->rq_status = obd_setattr(req->rq_svc_thread->t_env, exp, oinfo,
-                                     oti);
-
-        OBD_FREE_PTR(oinfo);
-
-        ost_drop_id(exp, &repbody->oa);
-        RETURN(0);
-}
-
-static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc, int opc,
-                              cksum_type_t cksum_type)
-{
-       struct cfs_crypto_hash_desc     *hdesc;
-       unsigned int                    bufsize;
-       int                             i, err;
-       unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
-       __u32                           cksum;
-
-       hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
-       if (IS_ERR(hdesc)) {
-               CERROR("Unable to initialize checksum hash %s\n",
-                      cfs_crypto_hash_name(cfs_alg));
-               return PTR_ERR(hdesc);
-       }
-       CDEBUG(D_INFO, "Checksum for algo %s\n", cfs_crypto_hash_name(cfs_alg));
-       for (i = 0; i < desc->bd_iov_count; i++) {
-
-               /* corrupt the data before we compute the checksum, to
-                * simulate a client->OST data error */
-               if (i == 0 && opc == OST_WRITE &&
-                   OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_RECEIVE)) {
-                       int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
-                       int len = desc->bd_iov[i].kiov_len;
-                       struct page *np = ost_page_to_corrupt;
-                       char *ptr = kmap(desc->bd_iov[i].kiov_page) + off;
-
-                       if (np) {
-                               char *ptr2 = kmap(np) + off;
-
-                               memcpy(ptr2, ptr, len);
-                               memcpy(ptr2, "bad3", min(4, len));
-                               kunmap(np);
-                               desc->bd_iov[i].kiov_page = np;
-                       } else {
-                               CERROR("can't alloc page for corruption\n");
-                       }
-               }
-               cfs_crypto_hash_update_page(hdesc, desc->bd_iov[i].kiov_page,
-                                 desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK,
-                                 desc->bd_iov[i].kiov_len);
-
-                /* corrupt the data after we compute the checksum, to
-                * simulate an OST->client data error */
-               if (i == 0 && opc == OST_READ &&
-                   OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_SEND)) {
-                       int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
-                       int len = desc->bd_iov[i].kiov_len;
-                       struct page *np = ost_page_to_corrupt;
-                       char *ptr = kmap(desc->bd_iov[i].kiov_page) + off;
-
-                       if (np) {
-                               char *ptr2 = kmap(np) + off;
-
-                               memcpy(ptr2, ptr, len);
-                               memcpy(ptr2, "bad4", min(4, len));
-                               kunmap(np);
-                               desc->bd_iov[i].kiov_page = np;
-                       } else {
-                               CERROR("can't alloc page for corruption\n");
-                       }
-               }
-       }
-
-       bufsize = 4;
-       err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
-       if (err)
-               cfs_crypto_hash_final(hdesc, NULL, NULL);
-
-       return cksum;
-}
-
-static int ost_brw_lock_get(int mode, struct obd_export *exp,
-                            struct obd_ioobj *obj, struct niobuf_remote *nb,
-                            struct lustre_handle *lh)
-{
-       __u64 flags               = 0;
-        int nrbufs                = obj->ioo_bufcnt;
-        struct ldlm_res_id res_id;
-        ldlm_policy_data_t policy;
-        int i;
-        ENTRY;
-
-       ostid_build_res_name(&obj->ioo_oid, &res_id);
-        LASSERT(mode == LCK_PR || mode == LCK_PW);
-        LASSERT(!lustre_handle_is_used(lh));
-
-        if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
-                RETURN(0);
-
-        for (i = 1; i < nrbufs; i ++)
-                if ((nb[0].flags & OBD_BRW_SRVLOCK) !=
-                    (nb[i].flags & OBD_BRW_SRVLOCK))
-                        RETURN(-EFAULT);
-
-        policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
-        policy.l_extent.end   = (nb[nrbufs - 1].offset +
-                                 nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
-
-        RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
-                                      LDLM_EXTENT, &policy, mode, &flags,
-                                      ldlm_blocking_ast, ldlm_completion_ast,
-                                     ldlm_glimpse_ast, NULL, 0, LVB_T_NONE,
-                                     NULL, lh));
-}
-
-static void ost_brw_lock_put(int mode,
-                             struct obd_ioobj *obj, struct niobuf_remote *niob,
-                             struct lustre_handle *lh)
-{
-        ENTRY;
-        LASSERT(mode == LCK_PR || mode == LCK_PW);
-        LASSERT((obj->ioo_bufcnt > 0 && (niob[0].flags & OBD_BRW_SRVLOCK)) ==
-                lustre_handle_is_used(lh));
-        if (lustre_handle_is_used(lh))
-                ldlm_lock_decref(lh, mode);
-        EXIT;
-}
-
-/* Allocate thread local buffers if needed */
-static struct ost_thread_local_cache *ost_tls_get(struct ptlrpc_request *r)
-{
-        struct ost_thread_local_cache *tls =
-                (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
-
-        /* In normal mode of operation an I/O request is serviced only
-         * by ll_ost_io threads each of them has own tls buffers allocated by
-         * ost_io_thread_init().
-         * During recovery, an I/O request may be queued until any of the ost
-         * service threads process it. Not necessary it should be one of
-         * ll_ost_io threads. In that case we dynamically allocating tls
-         * buffers for the request service time. */
-        if (unlikely(tls == NULL)) {
-                LASSERT(r->rq_export->exp_in_recovery);
-                OBD_ALLOC_PTR(tls);
-                if (tls != NULL) {
-                        tls->temporary = 1;
-                        r->rq_svc_thread->t_data = tls;
-                }
-        }
-        return  tls;
-}
-
-/* Free thread local buffers if they were allocated only for servicing
- * this one request */
-static void ost_tls_put(struct ptlrpc_request *r)
-{
-        struct ost_thread_local_cache *tls =
-                (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
-
-        if (unlikely(tls->temporary)) {
-                OBD_FREE_PTR(tls);
-                r->rq_svc_thread->t_data = NULL;
-        }
-}
-
-static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
-{
-        struct ptlrpc_bulk_desc *desc = NULL;
-        struct obd_export *exp = req->rq_export;
-        struct niobuf_remote *remote_nb;
-        struct niobuf_local *local_nb;
-        struct obd_ioobj *ioo;
-        struct ost_body *body, *repbody;
-        struct lustre_capa *capa = NULL;
-        struct l_wait_info lwi;
-        struct lustre_handle lockh = { 0 };
-        int niocount, npages, nob = 0, rc, i;
-        int no_reply = 0;
-        struct ost_thread_local_cache *tls;
-        ENTRY;
-
-        req->rq_bulk_read = 1;
-
-        if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
-                GOTO(out, rc = -EIO);
-
-        OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
-
-        /* Check if there is eviction in progress, and if so, wait for it to
-         * finish */
-        if (unlikely(cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
-                lwi = LWI_INTR(NULL, NULL); // We do not care how long it takes
-                rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
-                        !cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress),
-                        &lwi);
-        }
-        if (exp->exp_failed)
-                GOTO(out, rc = -ENOTCONN);
-
-        /* ost_body, ioobj & noibuf_remote are verified and swabbed in
-         * ost_rw_hpreq_check(). */
-        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
-        if (body == NULL)
-                GOTO(out, rc = -EFAULT);
-
-        /*
-         * A req_capsule_X_get_array(pill, field, ptr_to_element_count) function
-         * would be useful here and wherever we get &RMF_OBD_IOOBJ and
-         * &RMF_NIOBUF_REMOTE.
-         */
-        ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
-        if (ioo == NULL)
-                GOTO(out, rc = -EFAULT);
-
-        rc = ost_validate_obdo(exp, &body->oa, ioo);
-        if (rc)
-                RETURN(rc);
-
-        niocount = ioo->ioo_bufcnt;
-        remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
-        if (remote_nb == NULL)
-                GOTO(out, rc = -EFAULT);
-
-        if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
-                capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
-                if (capa == NULL) {
-                        CERROR("Missing capability for OST BRW READ");
-                        GOTO(out, rc = -EFAULT);
-                }
-        }
-
-        rc = req_capsule_server_pack(&req->rq_pill);
-        if (rc)
-                GOTO(out, rc);
-
-        tls = ost_tls_get(req);
-        if (tls == NULL)
-                GOTO(out_bulk, rc = -ENOMEM);
-        local_nb = tls->local;
-
-        rc = ost_brw_lock_get(LCK_PR, exp, ioo, remote_nb, &lockh);
-        if (rc != 0)
-                GOTO(out_tls, rc);
-
-       /*
-        * If getting the lock took more time than
-        * client was willing to wait, drop it. b=11330
-        */
-       if (cfs_time_current_sec() > req->rq_deadline ||
-           OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
-               no_reply = 1;
-               CERROR("Dropping timed-out read from %s because locking"
-                      "object "DOSTID" took %ld seconds (limit was %ld).\n",
-                      libcfs_id2str(req->rq_peer), POSTID(&ioo->ioo_oid),
-                      cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
-                      req->rq_deadline - req->rq_arrival_time.tv_sec);
-               GOTO(out_lock, rc = -ETIMEDOUT);
-       }
-
-        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
-        memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
-
-        npages = OST_THREAD_POOL_SIZE;
-        rc = obd_preprw(req->rq_svc_thread->t_env, OBD_BRW_READ, exp,
-                        &repbody->oa, 1, ioo, remote_nb, &npages, local_nb,
-                        oti, capa);
-        if (rc != 0)
-                GOTO(out_lock, rc);
-
-       desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo),
-                                   BULK_PUT_SOURCE, OST_BULK_PORTAL);
-       if (desc == NULL)
-               GOTO(out_commitrw, rc = -ENOMEM);
-
-        nob = 0;
-        for (i = 0; i < npages; i++) {
-                int page_rc = local_nb[i].rc;
-
-                if (page_rc < 0) {              /* error */
-                        rc = page_rc;
-                        break;
-                }
-
-                nob += page_rc;
-                if (page_rc != 0) {             /* some data! */
-                        LASSERT (local_nb[i].page != NULL);
-                       ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].page,
-                                                   local_nb[i].lnb_page_offset,
-                                                   page_rc);
-                }
-
-                if (page_rc != local_nb[i].len) { /* short read */
-                        /* All subsequent pages should be 0 */
-                        while(++i < npages)
-                                LASSERT(local_nb[i].rc == 0);
-                        break;
-                }
-        }
-
-        if (body->oa.o_valid & OBD_MD_FLCKSUM) {
-                cksum_type_t cksum_type =
-                        cksum_type_unpack(repbody->oa.o_valid & OBD_MD_FLFLAGS ?
-                                          repbody->oa.o_flags : 0);
-                repbody->oa.o_flags = cksum_type_pack(cksum_type);
-                repbody->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
-                repbody->oa.o_cksum = ost_checksum_bulk(desc, OST_READ,cksum_type);
-                CDEBUG(D_PAGE, "checksum at read origin: %x\n",
-                       repbody->oa.o_cksum);
-        } else {
-                repbody->oa.o_valid = 0;
-        }
-        /* We're finishing using body->oa as an input variable */
-
-        /* Check if client was evicted while we were doing i/o before touching
-           network */
-        if (rc == 0) {
-                if (likely(!CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2)))
-                        rc = target_bulk_io(exp, desc, &lwi);
-                no_reply = rc != 0;
-        }
-
-out_commitrw:
-        /* Must commit after prep above in all cases */
-        rc = obd_commitrw(req->rq_svc_thread->t_env, OBD_BRW_READ, exp,
-                          &repbody->oa, 1, ioo, remote_nb, npages, local_nb,
-                          oti, rc);
-
-        if (rc == 0)
-                ost_drop_id(exp, &repbody->oa);
-
-out_lock:
-        ost_brw_lock_put(LCK_PR, ioo, remote_nb, &lockh);
-out_tls:
-        ost_tls_put(req);
-out_bulk:
-        if (desc && !CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))
-               ptlrpc_free_bulk_nopin(desc);
-out:
-        LASSERT(rc <= 0);
-        if (rc == 0) {
-                req->rq_status = nob;
-                ptlrpc_lprocfs_brw(req, nob);
-                target_committed_to_req(req);
-                ptlrpc_reply(req);
-        } else if (!no_reply) {
-                /* Only reply if there was no comms problem with bulk */
-                target_committed_to_req(req);
-                req->rq_status = rc;
-                ptlrpc_error(req);
-        } else {
-                /* reply out callback would free */
-                ptlrpc_req_drop_rs(req);
-                LCONSOLE_WARN("%s: Bulk IO read error with %s (at %s), "
-                              "client will retry: rc %d\n",
-                              exp->exp_obd->obd_name,
-                              obd_uuid2str(&exp->exp_client_uuid),
-                              obd_export_nid2str(exp), rc);
-        }
-        /* send a bulk after reply to simulate a network delay or reordering
-         * by a router */
-       if (unlikely(CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))) {
-               wait_queue_head_t              waitq;
-               struct l_wait_info       lwi1;
-
-               CDEBUG(D_INFO, "reorder BULK\n");
-               init_waitqueue_head(&waitq);
-
-               lwi1 = LWI_TIMEOUT_INTR(cfs_time_seconds(3), NULL, NULL, NULL);
-               l_wait_event(waitq, 0, &lwi1);
-               rc = target_bulk_io(exp, desc, &lwi);
-               ptlrpc_free_bulk_nopin(desc);
-       }
-
-        RETURN(rc);
-}
-
-static void ost_warn_on_cksum(struct ptlrpc_request *req,
-                             struct ptlrpc_bulk_desc *desc,
-                             struct niobuf_local *local_nb, int npages,
-                             obd_count client_cksum, obd_count server_cksum,
-                             int mmap)
-{
-       struct obd_export *exp = req->rq_export;
-       struct ost_body *body;
-       char *router;
-       char *via;
-
-       body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
-       LASSERT (body != NULL);
-
-       if (req->rq_peer.nid == desc->bd_sender) {
-               via = router = "";
-       } else {
-               via = " via ";
-               router = libcfs_nid2str(desc->bd_sender);
-       }
-
-       if (mmap) {
-               CDEBUG_LIMIT(D_INFO, "client csum %x, server csum %x\n",
-                            client_cksum, server_cksum);
-               return;
-       }
-
-       LCONSOLE_ERROR_MSG(0x168, "BAD WRITE CHECKSUM: %s from %s%s%s inode "
-                          DFID" object "DOSTID" extent ["LPU64"-"LPU64
-                          "]: client csum %x, server csum %x\n",
-                          exp->exp_obd->obd_name, libcfs_id2str(req->rq_peer),
-                          via, router,
-                          body->oa.o_valid & OBD_MD_FLFID ?
-                          body->oa.o_parent_seq : (__u64)0,
-                          body->oa.o_valid & OBD_MD_FLFID ?
-                          body->oa.o_parent_oid : 0,
-                          body->oa.o_valid & OBD_MD_FLFID ?
-                          body->oa.o_parent_ver : 0,
-                          POSTID(&body->oa.o_oi),
-                          local_nb[0].lnb_file_offset,
-                          local_nb[npages-1].lnb_file_offset +
-                          local_nb[npages-1].len - 1,
-                          client_cksum, server_cksum);
-}
-
-static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
-{
-        struct ptlrpc_bulk_desc *desc = NULL;
-        struct obd_export       *exp = req->rq_export;
-        struct niobuf_remote    *remote_nb;
-        struct niobuf_local     *local_nb;
-        struct obd_ioobj        *ioo;
-        struct ost_body         *body, *repbody;
-        struct l_wait_info       lwi;
-        struct lustre_handle     lockh = {0};
-        struct lustre_capa      *capa = NULL;
-        __u32                   *rcs;
-        int objcount, niocount, npages;
-        int rc, i, j;
-        obd_count                client_cksum = 0, server_cksum = 0;
-        cksum_type_t             cksum_type = OBD_CKSUM_CRC32;
-        int                      no_reply = 0, mmap = 0;
-        __u32                    o_uid = 0, o_gid = 0;
-        struct ost_thread_local_cache *tls;
-        ENTRY;
-
-        req->rq_bulk_write = 1;
-
-        if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
-                GOTO(out, rc = -EIO);
-        if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK2))
-                GOTO(out, rc = -EFAULT);
-
-        /* pause before transaction has been started */
-        OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
-
-        /* ost_body, ioobj & noibuf_remote are verified and swabbed in
-         * ost_rw_hpreq_check(). */
-        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
-        if (body == NULL)
-                GOTO(out, rc = -EFAULT);
-
-        objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
-                                        RCL_CLIENT) / sizeof(*ioo);
-        ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
-        if (ioo == NULL)
-                GOTO(out, rc = -EFAULT);
-
-        rc = ost_validate_obdo(exp, &body->oa, ioo);
-        if (rc)
-                RETURN(rc);
-
-        for (niocount = i = 0; i < objcount; i++)
-                niocount += ioo[i].ioo_bufcnt;
-
-        /*
-         * It'd be nice to have a capsule function to indicate how many elements
-         * there were in a buffer for an RMF that's declared to be an array.
-         * It's easy enough to compute the number of elements here though.
-         */
-        remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
-        if (remote_nb == NULL || niocount != (req_capsule_get_size(&req->rq_pill,
-            &RMF_NIOBUF_REMOTE, RCL_CLIENT) / sizeof(*remote_nb)))
-                GOTO(out, rc = -EFAULT);
-
-        if ((remote_nb[0].flags & OBD_BRW_MEMALLOC) &&
-            (exp->exp_connection->c_peer.nid == exp->exp_connection->c_self))
-               memory_pressure_set();
-
-        if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
-                capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
-                if (capa == NULL) {
-                        CERROR("Missing capability for OST BRW WRITE");
-                        GOTO(out, rc = -EFAULT);
-                }
-        }
-
-        req_capsule_set_size(&req->rq_pill, &RMF_RCS, RCL_SERVER,
-                             niocount * sizeof(*rcs));
-        rc = req_capsule_server_pack(&req->rq_pill);
-        if (rc != 0)
-                GOTO(out, rc);
-        CFS_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_PACK, cfs_fail_val);
-        rcs = req_capsule_server_get(&req->rq_pill, &RMF_RCS);
-
-        tls = ost_tls_get(req);
-        if (tls == NULL)
-                GOTO(out_bulk, rc = -ENOMEM);
-        local_nb = tls->local;
-
-        rc = ost_brw_lock_get(LCK_PW, exp, ioo, remote_nb, &lockh);
-        if (rc != 0)
-                GOTO(out_tls, rc);
-
-       /*
-        * If getting the lock took more time than
-        * client was willing to wait, drop it. b=11330
-        */
-       if (cfs_time_current_sec() > req->rq_deadline ||
-           OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
-               no_reply = 1;
-               CERROR("Dropping timed-out write from %s because locking "
-                      "object "DOSTID" took %ld seconds (limit was %ld).\n",
-                      libcfs_id2str(req->rq_peer), POSTID(&ioo->ioo_oid),
-                      cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
-                      req->rq_deadline - req->rq_arrival_time.tv_sec);
-               GOTO(out_lock, rc = -ETIMEDOUT);
-       }
-
-        /* obd_preprw clobbers oa->valid, so save what we need */
-        if (body->oa.o_valid & OBD_MD_FLCKSUM) {
-                client_cksum = body->oa.o_cksum;
-                if (body->oa.o_valid & OBD_MD_FLFLAGS)
-                        cksum_type = cksum_type_unpack(body->oa.o_flags);
-        }
-        if (body->oa.o_valid & OBD_MD_FLFLAGS && body->oa.o_flags & OBD_FL_MMAP)
-                mmap = 1;
-
-        /* Because we already sync grant info with client when reconnect,
-         * grant info will be cleared for resent req, then fed_grant and
-         * total_grant will not be modified in following preprw_write */
-        if (lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) {
-                DEBUG_REQ(D_CACHE, req, "clear resent/replay req grant info");
-                body->oa.o_valid &= ~OBD_MD_FLGRANT;
-        }
-
-        if (exp_connect_rmtclient(exp)) {
-                o_uid = body->oa.o_uid;
-                o_gid = body->oa.o_gid;
-        }
-
-        repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
-        memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
-
-        npages = OST_THREAD_POOL_SIZE;
-        rc = obd_preprw(req->rq_svc_thread->t_env, OBD_BRW_WRITE, exp,
-                        &repbody->oa, objcount, ioo, remote_nb, &npages,
-                        local_nb, oti, capa);
-        if (rc != 0)
-                GOTO(out_lock, rc);
-
-       desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo),
-                                   BULK_GET_SINK, OST_BULK_PORTAL);
-       if (desc == NULL)
-               GOTO(skip_transfer, rc = -ENOMEM);
-
-       /* NB Having prepped, we must commit... */
-       for (i = 0; i < npages; i++)
-               ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].page,
-                                           local_nb[i].lnb_page_offset,
-                                           local_nb[i].len);
-
-        rc = sptlrpc_svc_prep_bulk(req, desc);
-        if (rc != 0)
-                GOTO(out_lock, rc);
-
-        rc = target_bulk_io(exp, desc, &lwi);
-        no_reply = rc != 0;
-
-skip_transfer:
-        if (client_cksum != 0 && rc == 0) {
-                static int cksum_counter;
-                repbody->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
-                repbody->oa.o_flags &= ~OBD_FL_CKSUM_ALL;
-                repbody->oa.o_flags |= cksum_type_pack(cksum_type);
-                server_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
-                repbody->oa.o_cksum = server_cksum;
-                cksum_counter++;
-                if (unlikely(client_cksum != server_cksum)) {
-                       ost_warn_on_cksum(req, desc, local_nb, npages,
-                                         client_cksum, server_cksum, mmap);
-                        cksum_counter = 0;
-
-                } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
-                        CDEBUG(D_INFO, "Checksum %u from %s OK: %x\n",
-                               cksum_counter, libcfs_id2str(req->rq_peer),
-                               server_cksum);
-                }
-        }
-
-        /* Must commit after prep above in all cases */
-        rc = obd_commitrw(req->rq_svc_thread->t_env, OBD_BRW_WRITE, exp,
-                          &repbody->oa, objcount, ioo, remote_nb, npages,
-                          local_nb, oti, rc);
-        if (rc == -ENOTCONN)
-                /* quota acquire process has been given up because
-                 * either the client has been evicted or the client
-                 * has timed out the request already */
-                no_reply = 1;
-
-        if (exp_connect_rmtclient(exp)) {
-                repbody->oa.o_uid = o_uid;
-                repbody->oa.o_gid = o_gid;
-        }
-
-        /*
-         * Disable sending mtime back to the client. If the client locked the
-         * whole object, then it has already updated the mtime on its side,
-         * otherwise it will have to glimpse anyway (see bug 21489, comment 32)
-         */
-        repbody->oa.o_valid &= ~(OBD_MD_FLMTIME | OBD_MD_FLATIME);
-
-        if (rc == 0) {
-                int nob = 0;
-
-                /* set per-requested niobuf return codes */
-                for (i = j = 0; i < niocount; i++) {
-                        int len = remote_nb[i].len;
-
-                        nob += len;
-                        rcs[i] = 0;
-                        do {
-                                LASSERT(j < npages);
-                                if (local_nb[j].rc < 0)
-                                        rcs[i] = local_nb[j].rc;
-                                len -= local_nb[j].len;
-                                j++;
-                        } while (len > 0);
-                        LASSERT(len == 0);
-                }
-                LASSERT(j == npages);
-                ptlrpc_lprocfs_brw(req, nob);
-        }
-
-out_lock:
-        ost_brw_lock_put(LCK_PW, ioo, remote_nb, &lockh);
-out_tls:
-        ost_tls_put(req);
-out_bulk:
-        if (desc)
-               ptlrpc_free_bulk_nopin(desc);
-out:
-        if (rc == 0) {
-                oti_to_request(oti, req);
-                target_committed_to_req(req);
-                rc = ptlrpc_reply(req);
-        } else if (!no_reply) {
-                /* Only reply if there was no comms problem with bulk */
-                target_committed_to_req(req);
-                req->rq_status = rc;
-                ptlrpc_error(req);
-        } else {
-                /* reply out callback would free */
-                ptlrpc_req_drop_rs(req);
-                LCONSOLE_WARN("%s: Bulk IO write error with %s (at %s), "
-                              "client will retry: rc %d\n",
-                              exp->exp_obd->obd_name,
-                              obd_uuid2str(&exp->exp_client_uuid),
-                              obd_export_nid2str(exp), rc);
-        }
-       memory_pressure_clr();
-        RETURN(rc);
-}
-
-/**
- * Implementation of OST_SET_INFO.
- *
- * OST_SET_INFO is like ioctl(): heavily overloaded.  Specifically, it takes a
- * "key" and a value RPC buffers as arguments, with the value's contents
- * interpreted according to the key.
- *
- * Value types that need swabbing have swabbing done explicitly, either here or
- * in functions called from here.  This should be corrected: all swabbing should
- * be done in the capsule abstraction, as that will then allow us to move
- * swabbing exclusively to the client without having to modify server code
- * outside the capsule abstraction's implementation itself.  To correct this
- * will require minor changes to the capsule abstraction; see the comments for
- * req_capsule_extend() in layout.c.
- */
-static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
-{
-        struct ost_body *body = NULL, *repbody;
-        char *key, *val = NULL;
-        int keylen, vallen, rc = 0;
-        int is_grant_shrink = 0;
-        ENTRY;
-
-        key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
-        if (key == NULL) {
-                DEBUG_REQ(D_HA, req, "no set_info key");
-                RETURN(-EFAULT);
-        }
-        keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
-                                      RCL_CLIENT);
-
-        vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
-                                      RCL_CLIENT);
-
-        if ((is_grant_shrink = KEY_IS(KEY_GRANT_SHRINK)))
-                /* In this case the value is actually an RMF_OST_BODY, so we
-                 * transmutate the type of this PTLRPC */
-                req_capsule_extend(&req->rq_pill, &RQF_OST_SET_GRANT_INFO);
-
-        rc = req_capsule_server_pack(&req->rq_pill);
-        if (rc)
-                RETURN(rc);
-
-        if (vallen) {
-                if (is_grant_shrink) {
-                        body = req_capsule_client_get(&req->rq_pill,
-                                                      &RMF_OST_BODY);
-                        if (!body)
-                                RETURN(-EFAULT);
-
-                        repbody = req_capsule_server_get(&req->rq_pill,
-                                                         &RMF_OST_BODY);
-                        memcpy(repbody, body, sizeof(*body));
-                        val = (char*)repbody;
-                } else {
-                        val = req_capsule_client_get(&req->rq_pill,
-                                                     &RMF_SETINFO_VAL);
-                }
-        }
-
-        if (KEY_IS(KEY_EVICT_BY_NID)) {
-                if (val && vallen)
-                        obd_export_evict_by_nid(exp->exp_obd, val);
-                GOTO(out, rc = 0);
-        } else if (KEY_IS(KEY_MDS_CONN) && ptlrpc_req_need_swab(req)) {
-                if (vallen < sizeof(__u32))
-                        RETURN(-EFAULT);
-                __swab32s((__u32 *)val);
-        }
-
-        /* OBD will also check if KEY_IS(KEY_GRANT_SHRINK), and will cast val to
-         * a struct ost_body * value */
-        rc = obd_set_info_async(req->rq_svc_thread->t_env, exp, keylen,
-                                key, vallen, val, NULL);
-out:
-        lustre_msg_set_status(req->rq_repmsg, 0);
-        RETURN(rc);
-}
-
-struct locked_region {
-       cfs_list_t  list;
-       struct lustre_handle lh;
-};
-
-static int lock_region(struct obd_export *exp, struct obdo *oa,
-                      unsigned long long begin, unsigned long long end,
-                      cfs_list_t *locked)
-{
-       struct locked_region *region = NULL;
-       int rc;
-
-       LASSERT(begin <= end);
-       OBD_ALLOC_PTR(region);
-       if (region == NULL)
-               return -ENOMEM;
-
-       rc = ost_lock_get(exp, oa, begin, end - begin, &region->lh, LCK_PR, 0);
-       if (rc) {
-               OBD_FREE_PTR(region);
-               return rc;
-       }
-
-       CDEBUG(D_OTHER, "ost lock [%llu,%llu], lh=%p\n",
-              begin, end, &region->lh);
-       cfs_list_add(&region->list, locked);
-
-       return 0;
-}
-
-static int lock_zero_regions(struct obd_export *exp, struct obdo *oa,
-                            struct ll_user_fiemap *fiemap,
-                            cfs_list_t *locked)
+/**
+ * Validate oa from client.
+ * If the request comes from 2.0 clients, currently only RSVD seq and IDIF
+ * req are valid.
+ *    a. objects in Single MDT FS  seq = FID_SEQ_OST_MDT0, oi_id != 0
+ *    b. Echo objects(seq = 2), old echo client still use oi_id/oi_seq to
+ *       pack ost_id. Because non-zero oi_seq will make it diffcult to tell
+ *       whether this is oi_fid or real ostid. So it will check
+ *       OBD_CONNECT_FID, then convert the ostid to FID for old client.
+ *    c. Old FID-disable osc will send IDIF.
+ *    d. new FID-enable osc/osp will send normal FID.
+ *
+ * And also oi_id/f_oid should always start from 1. oi_id/f_oid = 0 will
+ * be used for LAST_ID file, and only being accessed inside OST now.
+ */
+static int ost_validate_obdo(struct obd_export *exp, struct obdo *oa,
+                            struct obd_ioobj *ioobj)
 {
-       __u64 begin = fiemap->fm_start;
-       unsigned int i;
        int rc = 0;
-       struct ll_fiemap_extent *fiemap_start = fiemap->fm_extents;
-       ENTRY;
-
-       CDEBUG(D_OTHER, "extents count %u\n", fiemap->fm_mapped_extents);
-       for (i = 0; i < fiemap->fm_mapped_extents; i++) {
-               if (fiemap_start[i].fe_logical > begin) {
-                       CDEBUG(D_OTHER, "ost lock [%llu,%llu]\n",
-                              begin, fiemap_start[i].fe_logical);
-                       rc = lock_region(exp, oa, begin,
-                                   fiemap_start[i].fe_logical, locked);
-                       if (rc)
-                               RETURN(rc);
-               }
-
-               begin = fiemap_start[i].fe_logical + fiemap_start[i].fe_length;
-       }
-
-       if (begin < (fiemap->fm_start + fiemap->fm_length)) {
-               CDEBUG(D_OTHER, "ost lock [%llu,%llu]\n",
-                      begin, fiemap->fm_start + fiemap->fm_length);
-               rc = lock_region(exp, oa, begin,
-                                fiemap->fm_start + fiemap->fm_length, locked);
-       }
-
-       RETURN(rc);
-}
-
-static void unlock_zero_regions(struct obd_export *exp, cfs_list_t *locked)
-{
-       struct locked_region *entry, *temp;
-       cfs_list_for_each_entry_safe(entry, temp, locked, list) {
-               CDEBUG(D_OTHER, "ost unlock lh=%p\n", &entry->lh);
-               ost_lock_put(exp, &entry->lh, LCK_PR);
-               cfs_list_del(&entry->list);
-               OBD_FREE_PTR(entry);
-       }
-}
-
-static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
-{
-        void *key, *reply;
-        int keylen, replylen, rc = 0;
-        struct req_capsule *pill = &req->rq_pill;
-       cfs_list_t locked = CFS_LIST_HEAD_INIT(locked);
-       struct ll_fiemap_info_key *fm_key = NULL;
-       struct ll_user_fiemap *fiemap;
-        ENTRY;
-
-        /* this common part for get_info rpc */
-        key = req_capsule_client_get(pill, &RMF_SETINFO_KEY);
-        if (key == NULL) {
-                DEBUG_REQ(D_HA, req, "no get_info key");
-                RETURN(-EFAULT);
-        }
-        keylen = req_capsule_get_size(pill, &RMF_SETINFO_KEY, RCL_CLIENT);
 
-        if (KEY_IS(KEY_FIEMAP)) {
-               fm_key = key;
-                rc = ost_validate_obdo(exp, &fm_key->oa, NULL);
-                if (rc)
-                        RETURN(rc);
-       }
-
-        rc = obd_get_info(req->rq_svc_thread->t_env, exp, keylen, key,
-                          &replylen, NULL, NULL);
-        if (rc)
-               RETURN(rc);
-
-        req_capsule_set_size(pill, &RMF_GENERIC_DATA,
-                             RCL_SERVER, replylen);
+       if (unlikely(!(exp_connect_flags(exp) & OBD_CONNECT_FID) &&
+                    fid_seq_is_echo(oa->o_oi.oi.oi_seq) && oa != NULL)) {
+               /* Sigh 2.[123] client still sends echo req with oi_id = 0
+                * during create, and we will reset this to 1, since this
+                * oi_id is basically useless in the following create process,
+                * but oi_id == 0 will make it difficult to tell whether it is
+                * real FID or ost_id. */
+               oa->o_oi.oi_fid.f_oid = oa->o_oi.oi.oi_id ?: 1;
+               oa->o_oi.oi_fid.f_seq = FID_SEQ_ECHO;
+               oa->o_oi.oi_fid.f_ver = 0;
+       } else {
+               if (unlikely((oa == NULL) || ostid_id(&oa->o_oi) == 0))
+                       GOTO(out, rc = -EPROTO);
 
-        rc = req_capsule_server_pack(pill);
-        if (rc)
-               RETURN(rc);
-
-        reply = req_capsule_server_get(pill, &RMF_GENERIC_DATA);
-        if (reply == NULL)
-               RETURN(-ENOMEM);
-
-       if (KEY_IS(KEY_LAST_FID)) {
-               void *val;
-               int vallen;
-
-               req_capsule_extend(pill, &RQF_OST_GET_INFO_LAST_FID);
-               val = req_capsule_client_get(pill, &RMF_SETINFO_VAL);
-               vallen = req_capsule_get_size(pill, &RMF_SETINFO_VAL,
-                                             RCL_CLIENT);
-               if (val != NULL && vallen > 0 && replylen >= vallen) {
-                       memcpy(reply, val, vallen);
-               } else {
-                       CERROR("%s: invalid req val %p vallen %d replylen %d\n",
-                              exp->exp_obd->obd_name, val, vallen, replylen);
-                       RETURN(-EINVAL);
+               /* Note: this check might be forced in 2.5 or 2.6, i.e.
+                * all of the requests are required to setup FLGROUP */
+               if (unlikely(!(oa->o_valid & OBD_MD_FLGROUP))) {
+                       ostid_set_seq_mdt0(&oa->o_oi);
+                       if (ioobj)
+                               ostid_set_seq_mdt0(&ioobj->ioo_oid);
+                       oa->o_valid |= OBD_MD_FLGROUP;
                }
-       }
 
-       /* call again to fill in the reply buffer */
-       rc = obd_get_info(req->rq_svc_thread->t_env, exp, keylen, key,
-                         &replylen, reply, NULL);
-
-       /* LU-3219: Lock the sparse areas to make sure dirty flushed back
-        * from client, then call fiemap again. */
-       if (KEY_IS(KEY_FIEMAP) && (fm_key->oa.o_valid & OBD_MD_FLFLAGS) &&
-           (fm_key->oa.o_flags & OBD_FL_SRVLOCK)) {
-               fiemap = (struct ll_user_fiemap *)reply;
-               fm_key = key;
-
-               rc = lock_zero_regions(exp, &fm_key->oa, fiemap, &locked);
-               if (rc == 0 && !cfs_list_empty(&locked))
-                       rc = obd_get_info(req->rq_svc_thread->t_env, exp,
-                                         keylen, key, &replylen, reply, NULL);
-               unlock_zero_regions(exp, &locked);
-               if (rc)
-                       RETURN(rc);
+               if (unlikely(!(fid_seq_is_idif(ostid_seq(&oa->o_oi)) ||
+                              fid_seq_is_mdt0(ostid_seq(&oa->o_oi)) ||
+                              fid_seq_is_norm(ostid_seq(&oa->o_oi)) ||
+                              fid_seq_is_echo(ostid_seq(&oa->o_oi)))))
+                       GOTO(out, rc = -EPROTO);
        }
 
-       lustre_msg_set_status(req->rq_repmsg, 0);
-
-        RETURN(rc);
-}
-
-static int ost_handle_quotactl(struct ptlrpc_request *req)
-{
-        struct obd_quotactl *oqctl, *repoqc;
-        int rc;
-        ENTRY;
-
-        oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
-        if (oqctl == NULL)
-                GOTO(out, rc = -EPROTO);
-
-        rc = req_capsule_server_pack(&req->rq_pill);
-        if (rc)
-                GOTO(out, rc);
-
-        repoqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
-        req->rq_status = obd_quotactl(req->rq_export, oqctl);
-        *repoqc = *oqctl;
-
-out:
-        RETURN(rc);
-}
-
-static int ost_handle_quotacheck(struct ptlrpc_request *req)
-{
-        struct obd_quotactl *oqctl;
-        int rc;
-        ENTRY;
-
-        oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
-        if (oqctl == NULL)
-                RETURN(-EPROTO);
-
-        rc = req_capsule_server_pack(&req->rq_pill);
-        if (rc)
-                RETURN(-ENOMEM);
-
-       /* deprecated, not used any more */
-       req->rq_status = -EOPNOTSUPP;
-       RETURN(-EOPNOTSUPP);
-}
-
-static int ost_llog_handle_connect(struct obd_export *exp,
-                                   struct ptlrpc_request *req)
-{
-        struct llogd_conn_body *body;
-        int rc;
-        ENTRY;
-
-        body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_CONN_BODY);
-        rc = obd_llog_connect(exp, body);
-        RETURN(rc);
-}
-
-#define ost_init_sec_none(reply)                                       \
-do {                                                                   \
-       reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |          \
-                                     OBD_CONNECT_RMT_CLIENT_FORCE |    \
-                                     OBD_CONNECT_OSS_CAPA);            \
-} while (0)
-
-static int ost_init_sec_level(struct ptlrpc_request *req)
-{
-        struct obd_export *exp = req->rq_export;
-        struct req_capsule *pill = &req->rq_pill;
-        struct obd_device *obd = exp->exp_obd;
-        struct filter_obd *filter = &obd->u.filter;
-        char *client = libcfs_nid2str(req->rq_peer.nid);
-        struct obd_connect_data *data, *reply;
-        int rc = 0, remote;
-        ENTRY;
-
-        data = req_capsule_client_get(pill, &RMF_CONNECT_DATA);
-        reply = req_capsule_server_get(pill, &RMF_CONNECT_DATA);
-        if (data == NULL || reply == NULL)
-                RETURN(-EFAULT);
-
-        /* connection from MDT is always trusted */
-        if (req->rq_auth_usr_mdt) {
-               ost_init_sec_none(reply);
-                RETURN(0);
-        }
-
-        /* no GSS support case */
-        if (!req->rq_auth_gss) {
-                if (filter->fo_sec_level > LUSTRE_SEC_NONE) {
-                        CWARN("client %s -> target %s does not user GSS, "
-                              "can not run under security level %d.\n",
-                              client, obd->obd_name, filter->fo_sec_level);
-                        RETURN(-EACCES);
-                } else {
-                       ost_init_sec_none(reply);
-                        RETURN(0);
-                }
-        }
-
-        /* old version case */
-        if (unlikely(!(data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT) ||
-                     !(data->ocd_connect_flags & OBD_CONNECT_OSS_CAPA))) {
-                if (filter->fo_sec_level > LUSTRE_SEC_NONE) {
-                        CWARN("client %s -> target %s uses old version, "
-                              "can not run under security level %d.\n",
-                              client, obd->obd_name, filter->fo_sec_level);
-                        RETURN(-EACCES);
-                } else {
-                        CWARN("client %s -> target %s uses old version, "
-                              "run under security level %d.\n",
-                              client, obd->obd_name, filter->fo_sec_level);
-                       ost_init_sec_none(reply);
-                        RETURN(0);
-                }
-        }
-
-        remote = data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT_FORCE;
-        if (remote) {
-                if (!req->rq_auth_remote)
-                        CDEBUG(D_SEC, "client (local realm) %s -> target %s "
-                               "asked to be remote.\n", client, obd->obd_name);
-        } else if (req->rq_auth_remote) {
-                remote = 1;
-                CDEBUG(D_SEC, "client (remote realm) %s -> target %s is set "
-                       "as remote by default.\n", client, obd->obd_name);
-        }
-
-        if (remote) {
-                if (!filter->fo_fl_oss_capa) {
-                        CDEBUG(D_SEC, "client %s -> target %s is set as remote,"
-                               " but OSS capabilities are not enabled: %d.\n",
-                               client, obd->obd_name, filter->fo_fl_oss_capa);
-                        RETURN(-EACCES);
-                }
-        }
-
-        switch (filter->fo_sec_level) {
-        case LUSTRE_SEC_NONE:
-                if (!remote) {
-                       ost_init_sec_none(reply);
-                        break;
-                } else {
-                        CDEBUG(D_SEC, "client %s -> target %s is set as remote, "
-                               "can not run under security level %d.\n",
-                               client, obd->obd_name, filter->fo_sec_level);
-                        RETURN(-EACCES);
-                }
-        case LUSTRE_SEC_REMOTE:
-                if (!remote)
-                       ost_init_sec_none(reply);
-                break;
-        case LUSTRE_SEC_ALL:
-                if (!remote) {
-                        reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |
-                                                      OBD_CONNECT_RMT_CLIENT_FORCE);
-                        if (!filter->fo_fl_oss_capa)
-                                reply->ocd_connect_flags &= ~OBD_CONNECT_OSS_CAPA;
-                }
-                break;
-        default:
-                RETURN(-EINVAL);
-        }
-
-        RETURN(rc);
-}
-
-/*
- * FIXME
- * this should be done in filter_connect()/filter_reconnect(), but
- * we can't obtain information like NID, which stored in incoming
- * request, thus can't decide what flavor to use. so we do it here.
- *
- * This hack should be removed after the OST stack be rewritten, just
- * like what we are doing in mdt_obd_connect()/mdt_obd_reconnect().
- */
-static int ost_connect_check_sptlrpc(struct ptlrpc_request *req)
-{
-        struct obd_export     *exp = req->rq_export;
-        struct filter_obd     *filter = &exp->exp_obd->u.filter;
-        struct sptlrpc_flavor  flvr;
-        int                    rc = 0;
-
-        if (unlikely(strcmp(exp->exp_obd->obd_type->typ_name,
-                            LUSTRE_ECHO_NAME) == 0)) {
-                exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_ANY;
-                return 0;
-        }
-
-        if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
-               read_lock(&filter->fo_sptlrpc_lock);
-               sptlrpc_target_choose_flavor(&filter->fo_sptlrpc_rset,
-                                            req->rq_sp_from,
-                                            req->rq_peer.nid,
-                                            &flvr);
-               read_unlock(&filter->fo_sptlrpc_lock);
-
-               spin_lock(&exp->exp_lock);
-
-                exp->exp_sp_peer = req->rq_sp_from;
-                exp->exp_flvr = flvr;
-
-                if (exp->exp_flvr.sf_rpc != SPTLRPC_FLVR_ANY &&
-                    exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
-                        CERROR("unauthorized rpc flavor %x from %s, "
-                               "expect %x\n", req->rq_flvr.sf_rpc,
-                               libcfs_nid2str(req->rq_peer.nid),
-                               exp->exp_flvr.sf_rpc);
-                        rc = -EACCES;
-                }
-
-               spin_unlock(&exp->exp_lock);
-        } else {
-                if (exp->exp_sp_peer != req->rq_sp_from) {
-                        CERROR("RPC source %s doesn't match %s\n",
-                               sptlrpc_part2name(req->rq_sp_from),
-                               sptlrpc_part2name(exp->exp_sp_peer));
-                        rc = -EACCES;
-                } else {
-                        rc = sptlrpc_target_export_check(exp, req);
-                }
-        }
-
-        return rc;
-}
-
-/* Ensure that data and metadata are synced to the disk when lock is cancelled
- * (if requested) */
-int ost_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
-                    void *data, int flag)
-{
-       struct lu_env   env;
-       __u32           sync_lock_cancel = 0;
-       __u32           len = sizeof(sync_lock_cancel);
-       int             rc = 0;
-
-       ENTRY;
+       if (ioobj != NULL) {
+               unsigned max_brw = ioobj_max_brw_get(ioobj);
 
-       rc = lu_env_init(&env, LCT_DT_THREAD);
-       if (unlikely(rc != 0))
-               RETURN(rc);
-
-       rc = obd_get_info(&env, lock->l_export, sizeof(KEY_SYNC_LOCK_CANCEL),
-                         KEY_SYNC_LOCK_CANCEL, &len, &sync_lock_cancel, NULL);
-       if (rc == 0 && flag == LDLM_CB_CANCELING &&
-           (lock->l_granted_mode & (LCK_PW|LCK_GROUP)) &&
-           (sync_lock_cancel == ALWAYS_SYNC_ON_CANCEL ||
-            (sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL &&
-             lock->l_flags & LDLM_FL_CBPENDING))) {
-               struct obd_info *oinfo;
-               struct obdo     *oa;
-               int              rc;
-
-               OBD_ALLOC_PTR(oinfo);
-               if (!oinfo)
-                       GOTO(out_env, rc = -ENOMEM);
-               OBDO_ALLOC(oa);
-               if (!oa) {
-                       OBD_FREE_PTR(oinfo);
-                       GOTO(out_env, rc = -ENOMEM);
+               if (unlikely((max_brw & (max_brw - 1)) != 0)) {
+                       CERROR("%s: client %s sent bad ioobj max %u for "DOSTID
+                              ": rc = -EPROTO\n", exp->exp_obd->obd_name,
+                              obd_export_nid2str(exp), max_brw,
+                              POSTID(&oa->o_oi));
+                       GOTO(out, rc = -EPROTO);
                }
-
-               ostid_res_name_to_id(&oa->o_oi, &lock->l_resource->lr_name);
-               oa->o_valid = OBD_MD_FLID|OBD_MD_FLGROUP;
-               oinfo->oi_oa = oa;
-               oinfo->oi_capa = BYPASS_CAPA;
-
-               rc = obd_sync(&env, lock->l_export, oinfo,
-                             lock->l_policy_data.l_extent.start,
-                             lock->l_policy_data.l_extent.end, NULL);
-               if (rc)
-                       CERROR("Error %d syncing data on lock cancel\n", rc);
-
-               OBDO_FREE(oa);
-               OBD_FREE_PTR(oinfo);
+               ioobj->ioo_oid = oa->o_oi;
        }
 
-       rc = ldlm_server_blocking_ast(lock, desc, data, flag);
-out_env:
-       lu_env_fini(&env);
-       RETURN(rc);
-}
-
-static int ost_filter_recovery_request(struct ptlrpc_request *req,
-                                       struct obd_device *obd, int *process)
-{
-        switch (lustre_msg_get_opc(req->rq_reqmsg)) {
-        case OST_CONNECT: /* This will never get here, but for completeness. */
-        case OST_DISCONNECT:
-               *process = 1;
-               RETURN(0);
-
-        case OBD_PING:
-        case OST_CREATE:
-        case OST_DESTROY:
-        case OST_PUNCH:
-        case OST_SETATTR:
-        case OST_SYNC:
-        case OST_WRITE:
-        case OBD_LOG_CANCEL:
-        case LDLM_ENQUEUE:
-                *process = target_queue_recovery_request(req, obd);
-                RETURN(0);
-
-        default:
-                DEBUG_REQ(D_WARNING, req, "not permitted during recovery");
-                *process = -EAGAIN;
-                RETURN(0);
-        }
-}
-
-int ost_msg_check_version(struct lustre_msg *msg)
-{
-        int rc;
-
-        switch(lustre_msg_get_opc(msg)) {
-        case OST_CONNECT:
-        case OST_DISCONNECT:
-        case OBD_PING:
-        case SEC_CTX_INIT:
-        case SEC_CTX_INIT_CONT:
-        case SEC_CTX_FINI:
-                rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
-                if (rc)
-                        CERROR("bad opc %u version %08x, expecting %08x\n",
-                               lustre_msg_get_opc(msg),
-                               lustre_msg_get_version(msg),
-                               LUSTRE_OBD_VERSION);
-                break;
-        case OST_CREATE:
-        case OST_DESTROY:
-        case OST_GETATTR:
-        case OST_SETATTR:
-        case OST_WRITE:
-        case OST_READ:
-        case OST_PUNCH:
-        case OST_STATFS:
-        case OST_SYNC:
-        case OST_SET_INFO:
-        case OST_GET_INFO:
-        case OST_QUOTACHECK:
-        case OST_QUOTACTL:
-                rc = lustre_msg_check_version(msg, LUSTRE_OST_VERSION);
-                if (rc)
-                        CERROR("bad opc %u version %08x, expecting %08x\n",
-                               lustre_msg_get_opc(msg),
-                               lustre_msg_get_version(msg),
-                               LUSTRE_OST_VERSION);
-                break;
-        case LDLM_ENQUEUE:
-        case LDLM_CONVERT:
-        case LDLM_CANCEL:
-        case LDLM_BL_CALLBACK:
-        case LDLM_CP_CALLBACK:
-                rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
-                if (rc)
-                        CERROR("bad opc %u version %08x, expecting %08x\n",
-                               lustre_msg_get_opc(msg),
-                               lustre_msg_get_version(msg),
-                               LUSTRE_DLM_VERSION);
-                break;
-        case LLOG_ORIGIN_CONNECT:
-        case OBD_LOG_CANCEL:
-                rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
-                if (rc)
-                        CERROR("bad opc %u version %08x, expecting %08x\n",
-                               lustre_msg_get_opc(msg),
-                               lustre_msg_get_version(msg),
-                               LUSTRE_LOG_VERSION);
-                break;
-       case OST_QUOTA_ADJUST_QUNIT:
-               rc = -ENOTSUPP;
-               CERROR("Quota adjust is deprecated as of 2.4.0\n");
-               break;
-        default:
-                CERROR("Unexpected opcode %d\n", lustre_msg_get_opc(msg));
-                rc = -ENOTSUPP;
-        }
-        return rc;
+out:
+       if (rc != 0)
+               CERROR("%s: client %s sent bad object "DOSTID": rc = %d\n",
+                      exp->exp_obd->obd_name, obd_export_nid2str(exp),
+                      oa ? ostid_seq(&oa->o_oi) : -1,
+                      oa ? ostid_id(&oa->o_oi) : -1, rc);
+       return rc;
 }
 
 struct ost_prolong_data {
@@ -2277,330 +534,6 @@ static int ost_io_hpreq_handler(struct ptlrpc_request *req)
         RETURN(0);
 }
 
-/* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
-int ost_handle(struct ptlrpc_request *req)
-{
-       struct obd_trans_info trans_info = { 0, };
-       struct obd_trans_info *oti = &trans_info;
-       int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
-       struct obd_device *obd = NULL;
-       __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
-       ENTRY;
-
-       /* OST module is kept between remounts, but the last reference
-        * to specific module (say, osd or ofd) kills all related keys
-        * from the environment. so we have to refill it until the root
-        * cause is fixed properly */
-       lu_env_refill(req->rq_svc_thread->t_env);
-
-       LASSERT(current->journal_info == NULL);
-
-       /* primordial rpcs don't affect server recovery */
-       switch (opc) {
-       case SEC_CTX_INIT:
-       case SEC_CTX_INIT_CONT:
-       case SEC_CTX_FINI:
-               GOTO(out, rc = 0);
-       }
-
-       req_capsule_init(&req->rq_pill, req, RCL_SERVER);
-
-       if (opc != OST_CONNECT) {
-               if (!class_connected_export(req->rq_export)) {
-                       CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
-                              opc, libcfs_id2str(req->rq_peer));
-                        req->rq_status = -ENOTCONN;
-                        GOTO(out, rc = -ENOTCONN);
-                }
-
-                obd = req->rq_export->exp_obd;
-
-                /* Check for aborted recovery. */
-                if (obd->obd_recovering) {
-                        rc = ost_filter_recovery_request(req, obd,
-                                                         &should_process);
-                        if (rc || !should_process)
-                                RETURN(rc);
-                        else if (should_process < 0) {
-                                req->rq_status = should_process;
-                                rc = ptlrpc_error(req);
-                                RETURN(rc);
-                        }
-                }
-        }
-
-        oti_init(oti, req);
-
-        rc = ost_msg_check_version(req->rq_reqmsg);
-        if (rc)
-                RETURN(rc);
-
-       if (req && req->rq_reqmsg && req->rq_export &&
-           (exp_connect_flags(req->rq_export) & OBD_CONNECT_JOBSTATS))
-               oti->oti_jobid = lustre_msg_get_jobid(req->rq_reqmsg);
-
-       switch (opc) {
-        case OST_CONNECT: {
-                CDEBUG(D_INODE, "connect\n");
-                req_capsule_set(&req->rq_pill, &RQF_OST_CONNECT);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET))
-                        RETURN(0);
-                rc = target_handle_connect(req);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET2))
-                        RETURN(0);
-                if (!rc) {
-                        rc = ost_init_sec_level(req);
-                        if (!rc)
-                                rc = ost_connect_check_sptlrpc(req);
-                }
-               if (rc == 0) {
-                       struct obd_export *exp = req->rq_export;
-                       struct obd_connect_data *reply;
-                       /* Now that connection handling has completed
-                        * successfully, atomically update the connect flags
-                        * in the shared export data structure.*/
-                       reply = req_capsule_server_get(&req->rq_pill,
-                                                      &RMF_CONNECT_DATA);
-                       spin_lock(&exp->exp_lock);
-                       exp->exp_connect_data = *reply;
-                       spin_unlock(&exp->exp_lock);
-               }
-                break;
-        }
-        case OST_DISCONNECT:
-                CDEBUG(D_INODE, "disconnect\n");
-                req_capsule_set(&req->rq_pill, &RQF_OST_DISCONNECT);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OST_DISCONNECT_NET))
-                        RETURN(0);
-                rc = target_handle_disconnect(req);
-                break;
-        case OST_CREATE:
-                CDEBUG(D_INODE, "create\n");
-                req_capsule_set(&req->rq_pill, &RQF_OST_CREATE);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OST_CREATE_NET))
-                        RETURN(0);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
-                        GOTO(out, rc = -EROFS);
-                rc = ost_create(req->rq_export, req, oti);
-                break;
-        case OST_DESTROY:
-                CDEBUG(D_INODE, "destroy\n");
-                req_capsule_set(&req->rq_pill, &RQF_OST_DESTROY);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OST_DESTROY_NET))
-                        RETURN(0);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
-                        GOTO(out, rc = -EROFS);
-                rc = ost_destroy(req->rq_export, req, oti);
-                break;
-        case OST_GETATTR:
-                CDEBUG(D_INODE, "getattr\n");
-                req_capsule_set(&req->rq_pill, &RQF_OST_GETATTR);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OST_GETATTR_NET))
-                        RETURN(0);
-                rc = ost_getattr(req->rq_export, req);
-                break;
-        case OST_SETATTR:
-                CDEBUG(D_INODE, "setattr\n");
-                req_capsule_set(&req->rq_pill, &RQF_OST_SETATTR);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OST_SETATTR_NET))
-                        RETURN(0);
-                rc = ost_setattr(req->rq_export, req, oti);
-                break;
-        case OST_WRITE:
-                req_capsule_set(&req->rq_pill, &RQF_OST_BRW_WRITE);
-                CDEBUG(D_INODE, "write\n");
-                /* req->rq_request_portal would be nice, if it was set */
-               if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL) {
-                       CERROR("%s: deny write request from %s to portal %u\n",
-                              req->rq_export->exp_obd->obd_name,
-                              obd_export_nid2str(req->rq_export),
-                              ptlrpc_req2svc(req)->srv_req_portal);
-                        GOTO(out, rc = -EPROTO);
-                }
-                if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_NET))
-                        RETURN(0);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC))
-                        GOTO(out, rc = -ENOSPC);
-                if (OBD_FAIL_TIMEOUT(OBD_FAIL_OST_EROFS, 1))
-                        GOTO(out, rc = -EROFS);
-                rc = ost_brw_write(req, oti);
-                LASSERT(current->journal_info == NULL);
-                /* ost_brw_write sends its own replies */
-                RETURN(rc);
-        case OST_READ:
-                req_capsule_set(&req->rq_pill, &RQF_OST_BRW_READ);
-                CDEBUG(D_INODE, "read\n");
-                /* req->rq_request_portal would be nice, if it was set */
-               if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL) {
-                       CERROR("%s: deny read request from %s to portal %u\n",
-                              req->rq_export->exp_obd->obd_name,
-                              obd_export_nid2str(req->rq_export),
-                              ptlrpc_req2svc(req)->srv_req_portal);
-                        GOTO(out, rc = -EPROTO);
-                }
-                if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_NET))
-                        RETURN(0);
-                rc = ost_brw_read(req, oti);
-                LASSERT(current->journal_info == NULL);
-                /* ost_brw_read sends its own replies */
-                RETURN(rc);
-        case OST_PUNCH:
-                CDEBUG(D_INODE, "punch\n");
-                req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OST_PUNCH_NET))
-                        RETURN(0);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
-                        GOTO(out, rc = -EROFS);
-                rc = ost_punch(req->rq_export, req, oti);
-                break;
-        case OST_STATFS:
-                CDEBUG(D_INODE, "statfs\n");
-                req_capsule_set(&req->rq_pill, &RQF_OST_STATFS);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OST_STATFS_NET))
-                        RETURN(0);
-                rc = ost_statfs(req);
-                break;
-        case OST_SYNC:
-                CDEBUG(D_INODE, "sync\n");
-                req_capsule_set(&req->rq_pill, &RQF_OST_SYNC);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OST_SYNC_NET))
-                        RETURN(0);
-               rc = ost_sync(req->rq_export, req, oti);
-                break;
-        case OST_SET_INFO:
-                DEBUG_REQ(D_INODE, req, "set_info");
-                req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
-                rc = ost_set_info(req->rq_export, req);
-                break;
-        case OST_GET_INFO:
-                DEBUG_REQ(D_INODE, req, "get_info");
-                req_capsule_set(&req->rq_pill, &RQF_OST_GET_INFO_GENERIC);
-                rc = ost_get_info(req->rq_export, req);
-                break;
-        case OST_QUOTACHECK:
-                CDEBUG(D_INODE, "quotacheck\n");
-                req_capsule_set(&req->rq_pill, &RQF_OST_QUOTACHECK);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OST_QUOTACHECK_NET))
-                        RETURN(0);
-                rc = ost_handle_quotacheck(req);
-                break;
-        case OST_QUOTACTL:
-                CDEBUG(D_INODE, "quotactl\n");
-                req_capsule_set(&req->rq_pill, &RQF_OST_QUOTACTL);
-                if (OBD_FAIL_CHECK(OBD_FAIL_OST_QUOTACTL_NET))
-                        RETURN(0);
-                rc = ost_handle_quotactl(req);
-                break;
-        case OBD_PING:
-                DEBUG_REQ(D_INODE, req, "ping");
-                req_capsule_set(&req->rq_pill, &RQF_OBD_PING);
-                rc = target_handle_ping(req);
-                break;
-        /* FIXME - just reply status */
-        case LLOG_ORIGIN_CONNECT:
-                DEBUG_REQ(D_INODE, req, "log connect");
-                req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_CONNECT);
-                rc = ost_llog_handle_connect(req->rq_export, req);
-                req->rq_status = rc;
-                rc = req_capsule_server_pack(&req->rq_pill);
-                if (rc)
-                        RETURN(rc);
-                RETURN(ptlrpc_reply(req));
-       case LDLM_ENQUEUE:
-               CDEBUG(D_INODE, "enqueue\n");
-               req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
-               if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_NET))
-                       RETURN(0);
-               rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
-                                        ost_blocking_ast,
-                                        ldlm_server_glimpse_ast);
-               fail = OBD_FAIL_OST_LDLM_REPLY_NET;
-               break;
-       case LDLM_CONVERT:
-               CDEBUG(D_INODE, "convert\n");
-               req_capsule_set(&req->rq_pill, &RQF_LDLM_CONVERT);
-               if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CONVERT_NET))
-                       RETURN(0);
-               rc = ldlm_handle_convert(req);
-               break;
-       case LDLM_CANCEL:
-               CDEBUG(D_INODE, "cancel\n");
-               req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
-               if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_NET))
-                       RETURN(0);
-               rc = ldlm_handle_cancel(req);
-               break;
-        case LDLM_BL_CALLBACK:
-        case LDLM_CP_CALLBACK:
-                CDEBUG(D_INODE, "callback\n");
-                CERROR("callbacks should not happen on OST\n");
-                /* fall through */
-        default:
-               CERROR("Unexpected opcode %d\n", opc);
-                req->rq_status = -ENOTSUPP;
-                rc = ptlrpc_error(req);
-                RETURN(rc);
-        }
-
-        LASSERT(current->journal_info == NULL);
-
-        EXIT;
-        /* If we're DISCONNECTing, the export_data is already freed */
-       if (!rc && opc != OST_DISCONNECT)
-                target_committed_to_req(req);
-
-out:
-        if (!rc)
-                oti_to_request(oti, req);
-
-        target_send_reply(req, rc, fail);
-        return 0;
-}
-EXPORT_SYMBOL(ost_handle);
-
-/*
- * free per-thread pool created by ost_io_thread_init().
- */
-static void ost_io_thread_done(struct ptlrpc_thread *thread)
-{
-        struct ost_thread_local_cache *tls; /* TLS stands for Thread-Local
-                                             * Storage */
-
-        ENTRY;
-
-        LASSERT(thread != NULL);
-
-        /*
-         * be prepared to handle partially-initialized pools (because this is
-         * called from ost_io_thread_init() for cleanup.
-         */
-        tls = thread->t_data;
-        if (tls != NULL) {
-                OBD_FREE_PTR(tls);
-                thread->t_data = NULL;
-        }
-        EXIT;
-}
-
-/*
- * initialize per-thread page pool (bug 5137).
- */
-static int ost_io_thread_init(struct ptlrpc_thread *thread)
-{
-        struct ost_thread_local_cache *tls;
-
-        ENTRY;
-
-        LASSERT(thread != NULL);
-        LASSERT(thread->t_data == NULL);
-
-        OBD_ALLOC_PTR(tls);
-        if (tls == NULL)
-                RETURN(-ENOMEM);
-        thread->t_data = tls;
-        RETURN(0);
-}
-
 #define OST_WATCHDOG_TIMEOUT (obd_timeout * 1000)
 
 static struct cfs_cpt_table    *ost_io_cptable;
@@ -2649,7 +582,7 @@ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
                        .cc_pattern             = oss_cpts,
                },
                .psc_ops                = {
-                       .so_req_handler         = ost_handle,
+                       .so_req_handler         = tgt_request_handle,
                        .so_req_printer         = target_print_req,
                        .so_hpreq_handler       = ptlrpc_hpreq_handler,
                },
@@ -2688,7 +621,7 @@ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
                        .cc_pattern             = oss_cpts,
                },
                .psc_ops                = {
-                       .so_req_handler         = ost_handle,
+                       .so_req_handler         = tgt_request_handle,
                        .so_req_printer         = target_print_req,
                },
        };
@@ -2753,9 +686,9 @@ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
                                                  oss_io_cpts : NULL,
                },
                .psc_ops                = {
-                       .so_thr_init            = ost_io_thread_init,
-                       .so_thr_done            = ost_io_thread_done,
-                       .so_req_handler         = ost_handle,
+                       .so_thr_init            = tgt_io_thread_init,
+                       .so_thr_done            = tgt_io_thread_done,
+                       .so_req_handler         = tgt_request_handle,
                        .so_hpreq_handler       = ost_io_hpreq_handler,
                        .so_req_printer         = target_print_req,
                },
@@ -2810,7 +743,6 @@ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
                GOTO(out_io, rc);
        }
 
-#if 0
        /* Object update service */
        memset(&svc_conf, 0, sizeof(svc_conf));
        svc_conf = (typeof(svc_conf)) {
@@ -2855,10 +787,13 @@ static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
                ost->ost_out_service = NULL;
                GOTO(out_seq, rc);
        }
-#endif
+
        ping_evictor_start();
 
        RETURN(0);
+out_seq:
+       ptlrpc_unregister_service(ost->ost_seq_service);
+       ost->ost_seq_service = NULL;
 out_io:
        ptlrpc_unregister_service(ost->ost_io_service);
        ost->ost_io_service = NULL;
@@ -2889,9 +824,8 @@ static int ost_cleanup(struct obd_device *obd)
        ptlrpc_unregister_service(ost->ost_create_service);
        ptlrpc_unregister_service(ost->ost_io_service);
        ptlrpc_unregister_service(ost->ost_seq_service);
-#if 0
        ptlrpc_unregister_service(ost->ost_out_service);
-#endif
+
        ost->ost_service = NULL;
        ost->ost_create_service = NULL;
        ost->ost_io_service = NULL;
@@ -2931,11 +865,6 @@ static int ost_health_check(const struct lu_env *env, struct obd_device *obd)
         return rc;
 }
 
-struct ost_thread_local_cache *ost_tls(struct ptlrpc_request *r)
-{
-        return (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
-}
-
 /* use obd ops to offer management infrastructure */
 static struct obd_ops ost_obd_ops = {
         .o_owner        = THIS_MODULE,
@@ -2947,11 +876,10 @@ static struct obd_ops ost_obd_ops = {
 
 static int __init ost_init(void)
 {
-        struct lprocfs_static_vars lvars;
-        int rc;
-        ENTRY;
+       struct lprocfs_static_vars lvars;
+       int rc;
 
-       ost_page_to_corrupt = alloc_page(GFP_IOFS);
+       ENTRY;
 
         lprocfs_ost_init_vars(&lvars);
         rc = class_register_type(&ost_obd_ops, NULL, lvars.module_vars,
@@ -2969,10 +897,7 @@ static int __init ost_init(void)
 
 static void /*__exit*/ ost_exit(void)
 {
-       if (ost_page_to_corrupt)
-               page_cache_release(ost_page_to_corrupt);
-
-        class_unregister_type(LUSTRE_OSS_NAME);
+       class_unregister_type(LUSTRE_OSS_NAME);
 }
 
 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
index d07b8ea..8b475a1 100644 (file)
 
 #define OSS_SERVICE_WATCHDOG_FACTOR 2
 
-/*
- * tunables for per-thread page pool (bug 5137)
- */
-#define OST_THREAD_POOL_SIZE PTLRPC_MAX_BRW_PAGES  /* pool size in pages */
-#define OST_THREAD_POOL_GFP  GFP_HIGHUSER    /* GFP mask for pool pages */
-
-struct page;
-struct niobuf_local;
-struct niobuf_remote;
-struct ptlrpc_request;
-
-/*
- * struct ost_thread_local_cache is allocated and initialized for each OST
- * thread by ost_thread_init().
- */
-struct ost_thread_local_cache {
-        /*
-         * pool of nio buffers used by write-path
-         */
-        struct niobuf_local   local[OST_THREAD_POOL_SIZE];
-        unsigned int          temporary:1;
-};
-
-struct ost_thread_local_cache *ost_tls(struct ptlrpc_request *r);
-
 #ifdef LPROCFS
 void lprocfs_ost_init_vars(struct lprocfs_static_vars *lvars);
 #else
index c793bb4..5b3b648 100644 (file)
@@ -610,7 +610,7 @@ static const struct req_msg_field *ost_get_info_generic_server[] = {
 
 static const struct req_msg_field *ost_get_info_generic_client[] = {
         &RMF_PTLRPC_BODY,
-        &RMF_SETINFO_KEY
+       &RMF_GETINFO_KEY
 };
 
 static const struct req_msg_field *ost_get_last_id_server[] = {
@@ -618,6 +618,12 @@ static const struct req_msg_field *ost_get_last_id_server[] = {
         &RMF_OBD_ID
 };
 
+static const struct req_msg_field *ost_get_last_fid_client[] = {
+       &RMF_PTLRPC_BODY,
+       &RMF_GETINFO_KEY,
+       &RMF_FID,
+};
+
 static const struct req_msg_field *ost_get_last_fid_server[] = {
        &RMF_PTLRPC_BODY,
        &RMF_FID,
@@ -742,7 +748,7 @@ static struct req_format *req_formats[] = {
         &RQF_OST_BRW_WRITE,
         &RQF_OST_STATFS,
         &RQF_OST_SET_GRANT_INFO,
-        &RQF_OST_GET_INFO_GENERIC,
+       &RQF_OST_GET_INFO,
         &RQF_OST_GET_INFO_LAST_ID,
        &RQF_OST_GET_INFO_LAST_FID,
        &RQF_OST_SET_INFO_LAST_FID,
@@ -1617,10 +1623,10 @@ struct req_format RQF_OST_SET_GRANT_INFO =
                          ost_body_only);
 EXPORT_SYMBOL(RQF_OST_SET_GRANT_INFO);
 
-struct req_format RQF_OST_GET_INFO_GENERIC =
+struct req_format RQF_OST_GET_INFO =
         DEFINE_REQ_FMT0("OST_GET_INFO", ost_get_info_generic_client,
                                         ost_get_info_generic_server);
-EXPORT_SYMBOL(RQF_OST_GET_INFO_GENERIC);
+EXPORT_SYMBOL(RQF_OST_GET_INFO);
 
 struct req_format RQF_OST_GET_INFO_LAST_ID =
         DEFINE_REQ_FMT0("OST_GET_INFO_LAST_ID", ost_get_info_generic_client,
@@ -1628,7 +1634,7 @@ struct req_format RQF_OST_GET_INFO_LAST_ID =
 EXPORT_SYMBOL(RQF_OST_GET_INFO_LAST_ID);
 
 struct req_format RQF_OST_GET_INFO_LAST_FID =
-       DEFINE_REQ_FMT0("OST_GET_INFO_LAST_FID", obd_set_info_client,
+       DEFINE_REQ_FMT0("OST_GET_INFO_LAST_FID", ost_get_last_fid_client,
                                                 ost_get_last_fid_server);
 EXPORT_SYMBOL(RQF_OST_GET_INFO_LAST_FID);
 
index e3e63db..6889115 100644 (file)
@@ -36,6 +36,7 @@
 
 #include <obd.h>
 #include <obd_class.h>
+#include <obd_cksum.h>
 
 #include "tgt_internal.h"
 
@@ -109,6 +110,147 @@ static int tgt_mdt_body_unpack(struct tgt_session_info *tsi, __u32 flags)
        RETURN(rc);
 }
 
+/**
+ * Validate oa from client.
+ * If the request comes from 2.0 clients, currently only RSVD seq and IDIF
+ * req are valid.
+ *    a. objects in Single MDT FS  seq = FID_SEQ_OST_MDT0, oi_id != 0
+ *    b. Echo objects(seq = 2), old echo client still use oi_id/oi_seq to
+ *       pack ost_id. Because non-zero oi_seq will make it diffcult to tell
+ *       whether this is oi_fid or real ostid. So it will check
+ *       OBD_CONNECT_FID, then convert the ostid to FID for old client.
+ *    c. Old FID-disable osc will send IDIF.
+ *    d. new FID-enable osc/osp will send normal FID.
+ *
+ * And also oi_id/f_oid should always start from 1. oi_id/f_oid = 0 will
+ * be used for LAST_ID file, and only being accessed inside OST now.
+ */
+int tgt_validate_obdo(struct tgt_session_info *tsi, struct obdo *oa)
+{
+       int rc;
+
+       ENTRY;
+
+       if (unlikely(!(exp_connect_flags(tsi->tsi_exp) & OBD_CONNECT_FID) &&
+                    fid_seq_is_echo(oa->o_oi.oi.oi_seq))) {
+               /* Sigh 2.[123] client still sends echo req with oi_id = 0
+                * during create, and we will reset this to 1, since this
+                * oi_id is basically useless in the following create process,
+                * but oi_id == 0 will make it difficult to tell whether it is
+                * real FID or ost_id. */
+               oa->o_oi.oi_fid.f_oid = oa->o_oi.oi.oi_id ?: 1;
+               oa->o_oi.oi_fid.f_seq = FID_SEQ_ECHO;
+               oa->o_oi.oi_fid.f_ver = 0;
+       } else {
+               if (unlikely((oa->o_valid & OBD_MD_FLID &&
+                             ostid_id(&oa->o_oi) == 0)))
+                       GOTO(out, rc = -EPROTO);
+
+               /* Note: this check might be forced in 2.5 or 2.6, i.e.
+                * all of the requests are required to setup FLGROUP */
+               if (unlikely(!(oa->o_valid & OBD_MD_FLGROUP))) {
+                       ostid_set_seq_mdt0(&oa->o_oi);
+                       oa->o_valid |= OBD_MD_FLGROUP;
+               }
+
+               if (unlikely(!(fid_seq_is_idif(ostid_seq(&oa->o_oi)) ||
+                              fid_seq_is_mdt0(ostid_seq(&oa->o_oi)) ||
+                              fid_seq_is_norm(ostid_seq(&oa->o_oi)) ||
+                              fid_seq_is_echo(ostid_seq(&oa->o_oi)))))
+                       GOTO(out, rc = -EPROTO);
+       }
+       RETURN(0);
+out:
+       CERROR("%s: client %s sent bad object "DOSTID": rc = %d\n",
+              tgt_name(tsi->tsi_tgt), obd_export_nid2str(tsi->tsi_exp),
+              ostid_seq(&oa->o_oi), ostid_id(&oa->o_oi), rc);
+       return rc;
+}
+EXPORT_SYMBOL(tgt_validate_obdo);
+
+static int tgt_ost_body_unpack(struct tgt_session_info *tsi, __u32 flags)
+{
+       struct ost_body         *body;
+       struct req_capsule      *pill = tsi->tsi_pill;
+       struct lustre_capa      *capa;
+       struct obd_ioobj        *ioo;
+       int                      rc;
+
+       ENTRY;
+
+       body = req_capsule_client_get(pill, &RMF_OST_BODY);
+       if (body == NULL)
+               RETURN(-EFAULT);
+
+       rc = tgt_validate_obdo(tsi, &body->oa);
+       if (rc)
+               RETURN(rc);
+
+       if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
+               capa = req_capsule_client_get(tsi->tsi_pill, &RMF_CAPA1);
+               if (capa == NULL) {
+                       CERROR("%s: OSSCAPA flag is set without capability\n",
+                              tgt_name(tsi->tsi_tgt));
+                       RETURN(-EFAULT);
+               }
+       }
+
+       tsi->tsi_ost_body = body;
+
+       if (req_capsule_has_field(pill, &RMF_OBD_IOOBJ, RCL_CLIENT)) {
+               unsigned                 max_brw;
+               struct niobuf_remote    *rnb;
+
+               ioo = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
+               if (ioo == NULL)
+                       RETURN(-EPROTO);
+
+               rnb = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
+               if (rnb == NULL)
+                       RETURN(-EPROTO);
+
+               max_brw = ioobj_max_brw_get(ioo);
+               if (unlikely((max_brw & (max_brw - 1)) != 0)) {
+                       CERROR("%s: client %s sent bad ioobj max %u for "DOSTID
+                              ": rc = %d\n", tgt_name(tsi->tsi_tgt),
+                              obd_export_nid2str(tsi->tsi_exp), max_brw,
+                              POSTID(&body->oa.o_oi), -EPROTO);
+                       RETURN(-EPROTO);
+               }
+               ioo->ioo_oid = body->oa.o_oi;
+       }
+
+       if (!(body->oa.o_valid & OBD_MD_FLID)) {
+               if (flags & HABEO_CORPUS) {
+                       CERROR("%s: OBD_MD_FLID flag is not set in ost_body "
+                              "but OID/FID is mandatory with HABEO_CORPUS\n",
+                              tgt_name(tsi->tsi_tgt));
+                       RETURN(-EPROTO);
+               } else {
+                       RETURN(0);
+               }
+       }
+
+       rc = ostid_to_fid(&tsi->tsi_fid, &body->oa.o_oi, 0);
+       if (rc != 0)
+               RETURN(rc);
+
+       if (!fid_is_sane(&tsi->tsi_fid)) {
+               CERROR("%s: invalid FID: "DFID"\n", tgt_name(tsi->tsi_tgt),
+                      PFID(&tsi->tsi_fid));
+               RETURN(-EINVAL);
+       }
+
+       ost_fid_build_resid(&tsi->tsi_fid, &tsi->tsi_resid);
+
+       /*
+        * OST doesn't get object in advance for further use to prevent
+        * situations with nested object_find which is potential deadlock.
+        */
+       tsi->tsi_corpus = NULL;
+       RETURN(rc);
+}
+
 static int tgt_unpack_req_pack_rep(struct tgt_session_info *tsi, __u32 flags)
 {
        struct req_capsule      *pill = tsi->tsi_pill;
@@ -118,11 +260,13 @@ static int tgt_unpack_req_pack_rep(struct tgt_session_info *tsi, __u32 flags)
 
        if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT)) {
                rc = tgt_mdt_body_unpack(tsi, flags);
+       } else if (req_capsule_has_field(pill, &RMF_OST_BODY, RCL_CLIENT)) {
+               rc = tgt_ost_body_unpack(tsi, flags);
        } else {
                rc = 0;
        }
 
-       if (flags & HABEO_REFERO) {
+       if (rc == 0 && flags & HABEO_REFERO) {
                /* Pack reply */
                if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
                        req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
@@ -242,11 +386,6 @@ static int tgt_handle_request0(struct tgt_session_info *tsi,
 
        LASSERT(current->journal_info == NULL);
 
-       /*
-        * If we're DISCONNECTing, the export_data is already freed
-        *
-        * WAS if (likely(... && h->mh_opc != MDS_DISCONNECT))
-        */
        if (likely(rc == 0 && req->rq_export))
                target_committed_to_req(req);
 
@@ -272,6 +411,12 @@ static int tgt_filter_recovery_request(struct ptlrpc_request *req,
        case SEQ_QUERY:
        case FLD_QUERY:
        case LDLM_ENQUEUE:
+       case OST_CREATE:
+       case OST_DESTROY:
+       case OST_PUNCH:
+       case OST_SETATTR:
+       case OST_SYNC:
+       case OST_WRITE:
                *process = target_queue_recovery_request(req, obd);
                RETURN(0);
 
@@ -374,6 +519,10 @@ int tgt_request_handle(struct ptlrpc_request *req)
                        rc = ptlrpc_error(req);
                        GOTO(out, rc);
                }
+               /* recovery-small test 18c asks to drop connect reply */
+               if (unlikely(opc == OST_CONNECT &&
+                            OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET2)))
+                       GOTO(out, rc = 0);
        }
 
        if (unlikely(!class_connected_export(req->rq_export))) {
@@ -386,6 +535,10 @@ int tgt_request_handle(struct ptlrpc_request *req)
 
        tsi->tsi_tgt = tgt = class_exp2tgt(req->rq_export);
        tsi->tsi_exp = req->rq_export;
+       if (exp_connect_flags(req->rq_export) & OBD_CONNECT_JOBSTATS)
+               tsi->tsi_jobid = lustre_msg_get_jobid(req->rq_reqmsg);
+       else
+               tsi->tsi_jobid = NULL;
 
        request_fail_id = tgt->lut_request_fail_id;
        tsi->tsi_reply_fail_id = tgt->lut_reply_fail_id;
@@ -445,6 +598,8 @@ out:
        tsi->tsi_env = NULL;
        tsi->tsi_mdt_body = NULL;
        tsi->tsi_dlm_req = NULL;
+       fid_zero(&tsi->tsi_fid);
+       memset(&tsi->tsi_resid, 0, sizeof tsi->tsi_resid);
        return rc;
 }
 EXPORT_SYMBOL(tgt_request_handle);
@@ -637,6 +792,30 @@ int tgt_connect_check_sptlrpc(struct ptlrpc_request *req, struct obd_export *exp
        return rc;
 }
 
+int tgt_adapt_sptlrpc_conf(struct lu_target *tgt, int initial)
+{
+       struct sptlrpc_rule_set  tmp_rset;
+       int                      rc;
+
+       sptlrpc_rule_set_init(&tmp_rset);
+       rc = sptlrpc_conf_target_get_rules(tgt->lut_obd, &tmp_rset, initial);
+       if (rc) {
+               CERROR("%s: failed get sptlrpc rules: rc = %d\n",
+                      tgt_name(tgt), rc);
+               return rc;
+       }
+
+       sptlrpc_target_update_exp_flavor(tgt->lut_obd, &tmp_rset);
+
+       write_lock(&tgt->lut_sptlrpc_lock);
+       sptlrpc_rule_set_free(&tgt->lut_sptlrpc_rset);
+       tgt->lut_sptlrpc_rset = tmp_rset;
+       write_unlock(&tgt->lut_sptlrpc_lock);
+
+       return 0;
+}
+EXPORT_SYMBOL(tgt_adapt_sptlrpc_conf);
+
 int tgt_connect(struct tgt_session_info *tsi)
 {
        struct ptlrpc_request   *req = tgt_ses_req(tsi);
@@ -848,12 +1027,78 @@ TGT_OBD_HDL    (0,       OBD_IDX_READ,           tgt_obd_idx_read)
 };
 EXPORT_SYMBOL(tgt_obd_handlers);
 
+int tgt_sync(const struct lu_env *env, struct lu_target *tgt,
+            struct dt_object *obj)
+{
+       int rc = 0;
+
+       ENTRY;
+
+       /* if no objid is specified, it means "sync whole filesystem" */
+       if (obj == NULL) {
+               rc = dt_sync(env, tgt->lut_bottom);
+       } else if (dt_version_get(env, obj) >
+                  tgt->lut_obd->obd_last_committed) {
+               rc = dt_object_sync(env, obj);
+       }
+
+       RETURN(rc);
+}
+EXPORT_SYMBOL(tgt_sync);
 /*
  * Unified target DLM handlers.
  */
+
+/* Ensure that data and metadata are synced to the disk when lock is cancelled
+ * (if requested) */
+int tgt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+                    void *data, int flag)
+{
+       struct lu_env            env;
+       struct lu_target        *tgt;
+       struct dt_object        *obj;
+       struct lu_fid            fid;
+       int                      rc = 0;
+
+       ENTRY;
+
+       tgt = class_exp2tgt(lock->l_export);
+
+       if (flag == LDLM_CB_CANCELING &&
+           (lock->l_granted_mode & (LCK_PW | LCK_GROUP)) &&
+           (tgt->lut_sync_lock_cancel == ALWAYS_SYNC_ON_CANCEL ||
+            (tgt->lut_sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL &&
+             lock->l_flags & LDLM_FL_CBPENDING))) {
+               rc = lu_env_init(&env, LCT_DT_THREAD);
+               if (unlikely(rc != 0))
+                       RETURN(rc);
+
+               ost_fid_from_resid(&fid, &lock->l_resource->lr_name);
+               obj = dt_locate(&env, tgt->lut_bottom, &fid);
+               if (IS_ERR(obj))
+                       GOTO(err_env, rc = PTR_ERR(obj));
+
+               if (!dt_object_exists(obj))
+                       GOTO(err_put, rc = -ENOENT);
+
+               rc = tgt_sync(&env, tgt, obj);
+               if (rc < 0) {
+                       CERROR("%s: sync failed on lock cancel: rc = %d\n",
+                              tgt_name(tgt), rc);
+               }
+err_put:
+               lu_object_put(&env, &obj->do_lu);
+err_env:
+               lu_env_fini(&env);
+       }
+
+       rc = ldlm_server_blocking_ast(lock, desc, data, flag);
+       RETURN(rc);
+}
+
 struct ldlm_callback_suite tgt_dlm_cbs = {
        .lcs_completion = ldlm_server_completion_ast,
-       .lcs_blocking   = ldlm_server_blocking_ast,
+       .lcs_blocking   = tgt_blocking_ast,
        .lcs_glimpse    = ldlm_server_glimpse_ast
 };
 
@@ -1015,3 +1260,670 @@ TGT_SEC_HDL_VAR(0,      SEC_CTX_INIT_CONT,      tgt_sec_ctx_handle),
 TGT_SEC_HDL_VAR(0,     SEC_CTX_FINI,           tgt_sec_ctx_handle),
 };
 EXPORT_SYMBOL(tgt_sec_ctx_handlers);
+
+/*
+ * initialize per-thread page pool (bug 5137).
+ */
+int tgt_io_thread_init(struct ptlrpc_thread *thread)
+{
+       struct tgt_thread_big_cache *tbc;
+
+       ENTRY;
+
+       LASSERT(thread != NULL);
+       LASSERT(thread->t_data == NULL);
+
+       OBD_ALLOC_LARGE(tbc, sizeof(*tbc));
+       if (tbc == NULL)
+               RETURN(-ENOMEM);
+       thread->t_data = tbc;
+       RETURN(0);
+}
+EXPORT_SYMBOL(tgt_io_thread_init);
+
+/*
+ * free per-thread pool created by tgt_thread_init().
+ */
+void tgt_io_thread_done(struct ptlrpc_thread *thread)
+{
+       struct tgt_thread_big_cache *tbc;
+
+       ENTRY;
+
+       LASSERT(thread != NULL);
+
+       /*
+        * be prepared to handle partially-initialized pools (because this is
+        * called from ost_io_thread_init() for cleanup.
+        */
+       tbc = thread->t_data;
+       if (tbc != NULL) {
+               OBD_FREE_LARGE(tbc, sizeof(*tbc));
+               thread->t_data = NULL;
+       }
+       EXIT;
+}
+EXPORT_SYMBOL(tgt_io_thread_done);
+/**
+ * Helper function for getting server side [start, start+count] DLM lock
+ * if asked by client.
+ */
+int tgt_extent_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
+                   __u64 start, __u64 end, struct lustre_handle *lh,
+                   int mode, __u64 *flags)
+{
+       ldlm_policy_data_t       policy;
+       int                      rc;
+
+       ENTRY;
+
+       LASSERT(lh != NULL);
+       LASSERT(ns != NULL);
+       LASSERT(!lustre_handle_is_used(lh));
+
+       policy.l_extent.gid = 0;
+       policy.l_extent.start = start & CFS_PAGE_MASK;
+
+       /*
+        * If ->o_blocks is EOF it means "lock till the end of the file".
+        * Otherwise, it's size of an extent or hole being punched (in bytes).
+        */
+       if (end == OBD_OBJECT_EOF || end < start)
+               policy.l_extent.end = OBD_OBJECT_EOF;
+       else
+               policy.l_extent.end = end | ~CFS_PAGE_MASK;
+
+       rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_EXTENT, &policy, mode,
+                                   flags, ldlm_blocking_ast,
+                                   ldlm_completion_ast, ldlm_glimpse_ast,
+                                   NULL, 0, LVB_T_NONE, NULL, lh);
+       RETURN(rc == ELDLM_OK ? 0 : -EIO);
+}
+EXPORT_SYMBOL(tgt_extent_lock);
+
+void tgt_extent_unlock(struct lustre_handle *lh, ldlm_mode_t mode)
+{
+       LASSERT(lustre_handle_is_used(lh));
+       ldlm_lock_decref(lh, mode);
+}
+EXPORT_SYMBOL(tgt_extent_unlock);
+
+int tgt_brw_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
+                struct obd_ioobj *obj, struct niobuf_remote *nb,
+                struct lustre_handle *lh, int mode)
+{
+       __u64                    flags = 0;
+       int                      nrbufs = obj->ioo_bufcnt;
+       int                      i;
+
+       ENTRY;
+
+       LASSERT(mode == LCK_PR || mode == LCK_PW);
+       LASSERT(!lustre_handle_is_used(lh));
+
+       if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
+               RETURN(0);
+
+       for (i = 1; i < nrbufs; i++)
+               if (!(nb[i].flags & OBD_BRW_SRVLOCK))
+                       RETURN(-EFAULT);
+
+       RETURN(tgt_extent_lock(ns, res_id, nb[0].offset,
+                              nb[nrbufs - 1].offset + nb[nrbufs - 1].len - 1,
+                              lh, mode, &flags));
+}
+EXPORT_SYMBOL(tgt_brw_lock);
+
+void tgt_brw_unlock(struct obd_ioobj *obj, struct niobuf_remote *niob,
+                   struct lustre_handle *lh, int mode)
+{
+       ENTRY;
+
+       LASSERT(mode == LCK_PR || mode == LCK_PW);
+       LASSERT((obj->ioo_bufcnt > 0 && (niob[0].flags & OBD_BRW_SRVLOCK)) ==
+               lustre_handle_is_used(lh));
+       if (lustre_handle_is_used(lh))
+               tgt_extent_unlock(lh, mode);
+       EXIT;
+}
+EXPORT_SYMBOL(tgt_brw_unlock);
+
+static __u32 tgt_checksum_bulk(struct lu_target *tgt,
+                              struct ptlrpc_bulk_desc *desc, int opc,
+                              cksum_type_t cksum_type)
+{
+       struct cfs_crypto_hash_desc     *hdesc;
+       unsigned int                    bufsize;
+       int                             i, err;
+       unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
+       __u32                           cksum;
+
+       hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
+       if (IS_ERR(hdesc)) {
+               CERROR("%s: unable to initialize checksum hash %s\n",
+                      tgt_name(tgt), cfs_crypto_hash_name(cfs_alg));
+               return PTR_ERR(hdesc);
+       }
+
+       CDEBUG(D_INFO, "Checksum for algo %s\n", cfs_crypto_hash_name(cfs_alg));
+       for (i = 0; i < desc->bd_iov_count; i++) {
+               /* corrupt the data before we compute the checksum, to
+                * simulate a client->OST data error */
+               if (i == 0 && opc == OST_WRITE &&
+                   OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_RECEIVE)) {
+                       int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
+                       int len = desc->bd_iov[i].kiov_len;
+                       struct page *np = tgt_page_to_corrupt;
+                       char *ptr = kmap(desc->bd_iov[i].kiov_page) + off;
+
+                       if (np) {
+                               char *ptr2 = kmap(np) + off;
+
+                               memcpy(ptr2, ptr, len);
+                               memcpy(ptr2, "bad3", min(4, len));
+                               kunmap(np);
+                               desc->bd_iov[i].kiov_page = np;
+                       } else {
+                               CERROR("%s: can't alloc page for corruption\n",
+                                      tgt_name(tgt));
+                       }
+               }
+               cfs_crypto_hash_update_page(hdesc, desc->bd_iov[i].kiov_page,
+                                 desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK,
+                                 desc->bd_iov[i].kiov_len);
+
+                /* corrupt the data after we compute the checksum, to
+                * simulate an OST->client data error */
+               if (i == 0 && opc == OST_READ &&
+                   OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_SEND)) {
+                       int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
+                       int len = desc->bd_iov[i].kiov_len;
+                       struct page *np = tgt_page_to_corrupt;
+                       char *ptr = kmap(desc->bd_iov[i].kiov_page) + off;
+
+                       if (np) {
+                               char *ptr2 = kmap(np) + off;
+
+                               memcpy(ptr2, ptr, len);
+                               memcpy(ptr2, "bad4", min(4, len));
+                               kunmap(np);
+                               desc->bd_iov[i].kiov_page = np;
+                       } else {
+                               CERROR("%s: can't alloc page for corruption\n",
+                                      tgt_name(tgt));
+                       }
+               }
+       }
+
+       bufsize = 4;
+       err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
+       if (err)
+               cfs_crypto_hash_final(hdesc, NULL, NULL);
+
+       return cksum;
+}
+
+int tgt_brw_read(struct tgt_session_info *tsi)
+{
+       struct ptlrpc_request   *req = tgt_ses_req(tsi);
+       struct ptlrpc_bulk_desc *desc = NULL;
+       struct obd_export       *exp = tsi->tsi_exp;
+       struct niobuf_remote    *remote_nb;
+       struct niobuf_local     *local_nb;
+       struct obd_ioobj        *ioo;
+       struct ost_body         *body, *repbody;
+       struct l_wait_info       lwi;
+       struct lustre_handle     lockh = { 0 };
+       int                      niocount, npages, nob = 0, rc, i;
+       int                      no_reply = 0;
+       struct tgt_thread_big_cache *tbc = req->rq_svc_thread->t_data;
+
+       ENTRY;
+
+       if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL) {
+               CERROR("%s: deny read request from %s to portal %u\n",
+                      tgt_name(tsi->tsi_tgt),
+                      obd_export_nid2str(req->rq_export),
+                      ptlrpc_req2svc(req)->srv_req_portal);
+               RETURN(-EPROTO);
+       }
+
+       req->rq_bulk_read = 1;
+
+       if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
+               RETURN(-EIO);
+
+       OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
+
+       /* Check if there is eviction in progress, and if so, wait for it to
+        * finish */
+       if (unlikely(cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
+               /* We do not care how long it takes */
+               lwi = LWI_INTR(NULL, NULL);
+               rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
+                        !cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress),
+                        &lwi);
+       }
+
+       /* There must be big cache in current thread to process this request
+        * if it is NULL then something went wrong and it wasn't allocated,
+        * report -ENOMEM in that case */
+       if (tbc == NULL)
+               RETURN(-ENOMEM);
+
+       body = tsi->tsi_ost_body;
+       LASSERT(body != NULL);
+
+       ioo = req_capsule_client_get(tsi->tsi_pill, &RMF_OBD_IOOBJ);
+       LASSERT(ioo != NULL); /* must exists after tgt_ost_body_unpack */
+
+       niocount = ioo->ioo_bufcnt;
+       remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
+       LASSERT(remote_nb != NULL); /* must exists after tgt_ost_body_unpack */
+
+       local_nb = tbc->local;
+
+       rc = tgt_brw_lock(exp->exp_obd->obd_namespace, &tsi->tsi_resid, ioo,
+                         remote_nb, &lockh, LCK_PR);
+       if (rc != 0)
+               RETURN(rc);
+
+       /*
+        * If getting the lock took more time than
+        * client was willing to wait, drop it. b=11330
+        */
+       if (cfs_time_current_sec() > req->rq_deadline ||
+           OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
+               no_reply = 1;
+               CERROR("Dropping timed-out read from %s because locking"
+                      "object "DOSTID" took %ld seconds (limit was %ld).\n",
+                      libcfs_id2str(req->rq_peer), POSTID(&ioo->ioo_oid),
+                      cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
+                      req->rq_deadline - req->rq_arrival_time.tv_sec);
+               GOTO(out_lock, rc = -ETIMEDOUT);
+       }
+
+       repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+       repbody->oa = body->oa;
+
+       npages = PTLRPC_MAX_BRW_PAGES;
+       rc = obd_preprw(tsi->tsi_env, OBD_BRW_READ, exp, &repbody->oa, 1,
+                       ioo, remote_nb, &npages, local_nb, NULL, BYPASS_CAPA);
+       if (rc != 0)
+               GOTO(out_lock, rc);
+
+       desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo),
+                                   BULK_PUT_SOURCE, OST_BULK_PORTAL);
+       if (desc == NULL)
+               GOTO(out_commitrw, rc = -ENOMEM);
+
+       nob = 0;
+       for (i = 0; i < npages; i++) {
+               int page_rc = local_nb[i].rc;
+
+               if (page_rc < 0) {
+                       rc = page_rc;
+                       break;
+               }
+
+               nob += page_rc;
+               if (page_rc != 0) { /* some data! */
+                       LASSERT(local_nb[i].page != NULL);
+                       ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].page,
+                                                   local_nb[i].lnb_page_offset,
+                                                   page_rc);
+               }
+
+               if (page_rc != local_nb[i].len) { /* short read */
+                       /* All subsequent pages should be 0 */
+                       while (++i < npages)
+                               LASSERT(local_nb[i].rc == 0);
+                       break;
+               }
+       }
+
+       if (body->oa.o_valid & OBD_MD_FLCKSUM) {
+               cksum_type_t cksum_type =
+                       cksum_type_unpack(body->oa.o_valid & OBD_MD_FLFLAGS ?
+                                         body->oa.o_flags : 0);
+               repbody->oa.o_flags = cksum_type_pack(cksum_type);
+               repbody->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
+               repbody->oa.o_cksum = tgt_checksum_bulk(tsi->tsi_tgt, desc,
+                                                       OST_READ, cksum_type);
+               CDEBUG(D_PAGE, "checksum at read origin: %x\n",
+                      repbody->oa.o_cksum);
+       } else {
+               repbody->oa.o_valid = 0;
+       }
+       /* We're finishing using body->oa as an input variable */
+
+       /* Check if client was evicted while we were doing i/o before touching
+        * network */
+       if (likely(rc == 0 &&
+                  !CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))) {
+               rc = target_bulk_io(exp, desc, &lwi);
+               no_reply = rc != 0;
+       }
+
+out_commitrw:
+       /* Must commit after prep above in all cases */
+       rc = obd_commitrw(tsi->tsi_env, OBD_BRW_READ, exp,
+                         &repbody->oa, 1, ioo, remote_nb, npages, local_nb,
+                         NULL, rc);
+       if (rc == 0)
+               tgt_drop_id(exp, &repbody->oa);
+out_lock:
+       tgt_brw_unlock(ioo, remote_nb, &lockh, LCK_PR);
+
+       if (desc && !CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))
+               ptlrpc_free_bulk_nopin(desc);
+
+       LASSERT(rc <= 0);
+       if (rc == 0) {
+               rc = nob;
+               ptlrpc_lprocfs_brw(req, nob);
+       } else if (no_reply) {
+               req->rq_no_reply = 1;
+               /* reply out callback would free */
+               ptlrpc_req_drop_rs(req);
+               LCONSOLE_WARN("%s: Bulk IO read error with %s (at %s), "
+                             "client will retry: rc %d\n",
+                             exp->exp_obd->obd_name,
+                             obd_uuid2str(&exp->exp_client_uuid),
+                             obd_export_nid2str(exp), rc);
+       }
+       /* send a bulk after reply to simulate a network delay or reordering
+        * by a router */
+       if (unlikely(CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))) {
+               wait_queue_head_t        waitq;
+               struct l_wait_info       lwi1;
+
+               CDEBUG(D_INFO, "reorder BULK\n");
+               init_waitqueue_head(&waitq);
+
+               lwi1 = LWI_TIMEOUT_INTR(cfs_time_seconds(3), NULL, NULL, NULL);
+               l_wait_event(waitq, 0, &lwi1);
+               target_bulk_io(exp, desc, &lwi);
+               ptlrpc_free_bulk_nopin(desc);
+       }
+
+       RETURN(rc);
+}
+EXPORT_SYMBOL(tgt_brw_read);
+
+static void tgt_warn_on_cksum(struct ptlrpc_request *req,
+                             struct ptlrpc_bulk_desc *desc,
+                             struct niobuf_local *local_nb, int npages,
+                             obd_count client_cksum, obd_count server_cksum,
+                             bool mmap)
+{
+       struct obd_export *exp = req->rq_export;
+       struct ost_body *body;
+       char *router;
+       char *via;
+
+       body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+       LASSERT(body != NULL);
+
+       if (req->rq_peer.nid == desc->bd_sender) {
+               via = router = "";
+       } else {
+               via = " via ";
+               router = libcfs_nid2str(desc->bd_sender);
+       }
+
+       if (mmap) {
+               CDEBUG_LIMIT(D_INFO, "client csum %x, server csum %x\n",
+                            client_cksum, server_cksum);
+               return;
+       }
+
+       LCONSOLE_ERROR_MSG(0x168, "BAD WRITE CHECKSUM: %s from %s%s%s inode "
+                          DFID" object "DOSTID" extent ["LPU64"-"LPU64
+                          "]: client csum %x, server csum %x\n",
+                          exp->exp_obd->obd_name, libcfs_id2str(req->rq_peer),
+                          via, router,
+                          body->oa.o_valid & OBD_MD_FLFID ?
+                          body->oa.o_parent_seq : (__u64)0,
+                          body->oa.o_valid & OBD_MD_FLFID ?
+                          body->oa.o_parent_oid : 0,
+                          body->oa.o_valid & OBD_MD_FLFID ?
+                          body->oa.o_parent_ver : 0,
+                          POSTID(&body->oa.o_oi),
+                          local_nb[0].lnb_file_offset,
+                          local_nb[npages-1].lnb_file_offset +
+                          local_nb[npages-1].len - 1,
+                          client_cksum, server_cksum);
+}
+
+int tgt_brw_write(struct tgt_session_info *tsi)
+{
+       struct ptlrpc_request   *req = tgt_ses_req(tsi);
+       struct ptlrpc_bulk_desc *desc = NULL;
+       struct obd_export       *exp = req->rq_export;
+       struct niobuf_remote    *remote_nb;
+       struct niobuf_local     *local_nb;
+       struct obd_ioobj        *ioo;
+       struct ost_body         *body, *repbody;
+       struct l_wait_info       lwi;
+       struct lustre_handle     lockh = {0};
+       __u32                   *rcs;
+       int                      objcount, niocount, npages;
+       int                      rc, i, j;
+       cksum_type_t             cksum_type = OBD_CKSUM_CRC32;
+       bool                     no_reply = false, mmap;
+       struct tgt_thread_big_cache *tbc = req->rq_svc_thread->t_data;
+
+       ENTRY;
+
+       if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL) {
+               CERROR("%s: deny write request from %s to portal %u\n",
+                      tgt_name(tsi->tsi_tgt),
+                      obd_export_nid2str(req->rq_export),
+                      ptlrpc_req2svc(req)->srv_req_portal);
+               RETURN(err_serious(-EPROTO));
+       }
+
+       if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC))
+               RETURN(err_serious(-ENOSPC));
+       if (OBD_FAIL_TIMEOUT(OBD_FAIL_OST_EROFS, 1))
+               RETURN(err_serious(-EROFS));
+
+       req->rq_bulk_write = 1;
+
+       if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
+               RETURN(err_serious(-EIO));
+       if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK2))
+               RETURN(err_serious(-EFAULT));
+
+       /* pause before transaction has been started */
+       OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
+
+       /* There must be big cache in current thread to process this request
+        * if it is NULL then something went wrong and it wasn't allocated,
+        * report -ENOMEM in that case */
+       if (tbc == NULL)
+               RETURN(-ENOMEM);
+
+       body = tsi->tsi_ost_body;
+       LASSERT(body != NULL);
+
+       ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
+       LASSERT(ioo != NULL); /* must exists after tgt_ost_body_unpack */
+
+       objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
+                                       RCL_CLIENT) / sizeof(*ioo);
+
+       for (niocount = i = 0; i < objcount; i++)
+               niocount += ioo[i].ioo_bufcnt;
+
+       remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
+       LASSERT(remote_nb != NULL); /* must exists after tgt_ost_body_unpack */
+       if (niocount != req_capsule_get_size(&req->rq_pill,
+                                            &RMF_NIOBUF_REMOTE, RCL_CLIENT) /
+                       sizeof(*remote_nb))
+               RETURN(err_serious(-EPROTO));
+
+       if ((remote_nb[0].flags & OBD_BRW_MEMALLOC) &&
+           (exp->exp_connection->c_peer.nid == exp->exp_connection->c_self))
+               memory_pressure_set();
+
+       req_capsule_set_size(&req->rq_pill, &RMF_RCS, RCL_SERVER,
+                            niocount * sizeof(*rcs));
+       rc = req_capsule_server_pack(&req->rq_pill);
+       if (rc != 0)
+               GOTO(out, rc = err_serious(rc));
+
+       CFS_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_PACK, cfs_fail_val);
+       rcs = req_capsule_server_get(&req->rq_pill, &RMF_RCS);
+
+       local_nb = tbc->local;
+
+       rc = tgt_brw_lock(exp->exp_obd->obd_namespace, &tsi->tsi_resid, ioo,
+                         remote_nb, &lockh, LCK_PW);
+       if (rc != 0)
+               GOTO(out, rc);
+
+       /*
+        * If getting the lock took more time than
+        * client was willing to wait, drop it. b=11330
+        */
+       if (cfs_time_current_sec() > req->rq_deadline ||
+           OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
+               no_reply = true;
+               CERROR("%s: Dropping timed-out write from %s because locking "
+                      "object "DOSTID" took %ld seconds (limit was %ld).\n",
+                      tgt_name(tsi->tsi_tgt), libcfs_id2str(req->rq_peer),
+                      POSTID(&ioo->ioo_oid),
+                      cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
+                      req->rq_deadline - req->rq_arrival_time.tv_sec);
+               GOTO(out_lock, rc = -ETIMEDOUT);
+       }
+
+       /* Because we already sync grant info with client when reconnect,
+        * grant info will be cleared for resent req, then fed_grant and
+        * total_grant will not be modified in following preprw_write */
+       if (lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) {
+               DEBUG_REQ(D_CACHE, req, "clear resent/replay req grant info");
+               body->oa.o_valid &= ~OBD_MD_FLGRANT;
+       }
+
+       repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+       if (repbody == NULL)
+               GOTO(out_lock, rc = -ENOMEM);
+       repbody->oa = body->oa;
+
+       npages = PTLRPC_MAX_BRW_PAGES;
+       rc = obd_preprw(tsi->tsi_env, OBD_BRW_WRITE, exp, &repbody->oa,
+                       objcount, ioo, remote_nb, &npages, local_nb, NULL,
+                       BYPASS_CAPA);
+       if (rc < 0)
+               GOTO(out_lock, rc);
+
+       desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo),
+                                   BULK_GET_SINK, OST_BULK_PORTAL);
+       if (desc == NULL)
+               GOTO(skip_transfer, rc = -ENOMEM);
+
+       /* NB Having prepped, we must commit... */
+       for (i = 0; i < npages; i++)
+               ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].page,
+                                           local_nb[i].lnb_page_offset,
+                                           local_nb[i].len);
+
+       rc = sptlrpc_svc_prep_bulk(req, desc);
+       if (rc != 0)
+               GOTO(skip_transfer, rc);
+
+       rc = target_bulk_io(exp, desc, &lwi);
+       no_reply = rc != 0;
+
+skip_transfer:
+       if (body->oa.o_valid & OBD_MD_FLCKSUM && rc == 0) {
+               static int cksum_counter;
+
+               if (body->oa.o_valid & OBD_MD_FLFLAGS)
+                       cksum_type = cksum_type_unpack(body->oa.o_flags);
+
+               repbody->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
+               repbody->oa.o_flags &= ~OBD_FL_CKSUM_ALL;
+               repbody->oa.o_flags |= cksum_type_pack(cksum_type);
+               repbody->oa.o_cksum = tgt_checksum_bulk(tsi->tsi_tgt, desc,
+                                                       OST_WRITE, cksum_type);
+               cksum_counter++;
+
+               if (unlikely(body->oa.o_cksum != repbody->oa.o_cksum)) {
+                       mmap = (body->oa.o_valid & OBD_MD_FLFLAGS &&
+                               body->oa.o_flags & OBD_FL_MMAP);
+
+                       tgt_warn_on_cksum(req, desc, local_nb, npages,
+                                         body->oa.o_cksum,
+                                         repbody->oa.o_cksum, mmap);
+                       cksum_counter = 0;
+               } else if ((cksum_counter & (-cksum_counter)) ==
+                          cksum_counter) {
+                       CDEBUG(D_INFO, "Checksum %u from %s OK: %x\n",
+                              cksum_counter, libcfs_id2str(req->rq_peer),
+                              repbody->oa.o_cksum);
+               }
+       }
+
+       /* Must commit after prep above in all cases */
+       rc = obd_commitrw(tsi->tsi_env, OBD_BRW_WRITE, exp, &repbody->oa,
+                         objcount, ioo, remote_nb, npages, local_nb, NULL,
+                         rc);
+       if (rc == -ENOTCONN)
+               /* quota acquire process has been given up because
+                * either the client has been evicted or the client
+                * has timed out the request already */
+               no_reply = true;
+
+       /*
+        * Disable sending mtime back to the client. If the client locked the
+        * whole object, then it has already updated the mtime on its side,
+        * otherwise it will have to glimpse anyway (see bug 21489, comment 32)
+        */
+       repbody->oa.o_valid &= ~(OBD_MD_FLMTIME | OBD_MD_FLATIME);
+
+       if (rc == 0) {
+               int nob = 0;
+
+               /* set per-requested niobuf return codes */
+               for (i = j = 0; i < niocount; i++) {
+                       int len = remote_nb[i].len;
+
+                       nob += len;
+                       rcs[i] = 0;
+                       do {
+                               LASSERT(j < npages);
+                               if (local_nb[j].rc < 0)
+                                       rcs[i] = local_nb[j].rc;
+                               len -= local_nb[j].len;
+                               j++;
+                       } while (len > 0);
+                       LASSERT(len == 0);
+               }
+               LASSERT(j == npages);
+               ptlrpc_lprocfs_brw(req, nob);
+
+               tgt_drop_id(exp, &repbody->oa);
+       }
+out_lock:
+       tgt_brw_unlock(ioo, remote_nb, &lockh, LCK_PW);
+       if (desc)
+               ptlrpc_free_bulk_nopin(desc);
+out:
+       if (no_reply) {
+               req->rq_no_reply = 1;
+               /* reply out callback would free */
+               ptlrpc_req_drop_rs(req);
+               LCONSOLE_WARN("%s: Bulk IO write error with %s (at %s), "
+                             "client will retry: rc %d\n",
+                             exp->exp_obd->obd_name,
+                             obd_uuid2str(&exp->exp_client_uuid),
+                             obd_export_nid2str(exp), rc);
+       }
+       memory_pressure_clr();
+       RETURN(rc);
+}
+EXPORT_SYMBOL(tgt_brw_write);
index 5f45ca1..bc7809c 100644 (file)
@@ -195,4 +195,10 @@ int out_handle(struct tgt_session_info *tsi);
 #define out_tx_destroy(info, obj, th, reply, idx) \
        __out_tx_destroy(info, obj, th, reply, idx, __FILE__, __LINE__)
 
+extern struct page *tgt_page_to_corrupt;
+
+struct tgt_thread_big_cache {
+       struct niobuf_local     local[PTLRPC_MAX_BRW_PAGES];
+};
+
 #endif /* _TG_INTERNAL_H */
index 2694aaa..01a757f 100644 (file)
@@ -720,8 +720,13 @@ int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
        ted = &req->rq_export->exp_target_data;
 
        lw_client = exp_connect_flags(req->rq_export) & OBD_CONNECT_LIGHTWEIGHT;
+       if (ted->ted_lr_idx < 0 && !lw_client)
+               /* ofd connect may cause transaction before export has
+                * last_rcvd slot */
+               RETURN(0);
 
        tti->tti_transno = lustre_msg_get_transno(req->rq_reqmsg);
+
        spin_lock(&tgt->lut_translock);
        if (th->th_result != 0) {
                if (tti->tti_transno != 0) {
@@ -765,7 +770,7 @@ int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
                 * last_rcvd, we still want to maintain the in-memory
                 * lsd_client_data structure in order to properly handle reply
                 * reconstruction. */
-       } else if (ted->ted_lr_off <= 0) {
+       } else if (ted->ted_lr_off == 0) {
                CERROR("%s: client idx %d has offset %lld\n",
                       tgt_name(tgt), ted->ted_lr_idx, ted->ted_lr_off);
                RETURN(-EINVAL);
@@ -839,3 +844,46 @@ srv_update:
 }
 EXPORT_SYMBOL(tgt_last_rcvd_update);
 
+/*
+ * last_rcvd update for echo client simulation.
+ * It updates last_rcvd client slot and version of object in
+ * simple way but with all locks to simulate all drawbacks
+ */
+int tgt_last_rcvd_update_echo(const struct lu_env *env, struct lu_target *tgt,
+                             struct dt_object *obj, struct thandle *th,
+                             struct obd_export *exp)
+{
+       struct tgt_thread_info  *tti = tgt_th_info(env);
+       struct tg_export_data   *ted = &exp->exp_target_data;
+       int                      rc = 0;
+
+       ENTRY;
+
+       tti->tti_transno = 0;
+
+       spin_lock(&tgt->lut_translock);
+       if (th->th_result == 0)
+               tti->tti_transno = ++tgt->lut_last_transno;
+       spin_unlock(&tgt->lut_translock);
+
+       /** VBR: set new versions */
+       if (th->th_result == 0 && obj != NULL)
+               dt_version_set(env, obj, tti->tti_transno, th);
+
+       /* if can't add callback, do sync write */
+       th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, exp,
+                                               tti->tti_transno);
+
+       LASSERT(ted->ted_lr_off > 0);
+
+       mutex_lock(&ted->ted_lcd_lock);
+       LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
+       ted->ted_lcd->lcd_last_transno = tti->tti_transno;
+       ted->ted_lcd->lcd_last_result = th->th_result;
+
+       tti->tti_off = ted->ted_lr_off;
+       rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
+       mutex_unlock(&ted->ted_lcd_lock);
+       RETURN(rc);
+}
+EXPORT_SYMBOL(tgt_last_rcvd_update_echo);
index bcf2b2f..4f7cff5 100644 (file)
@@ -69,6 +69,9 @@ int tgt_init(const struct lu_env *env, struct lu_target *lut,
        lut->lut_mds_capa = 1;
        lut->lut_oss_capa = 1;
 
+       spin_lock_init(&lut->lut_flags_lock);
+       lut->lut_sync_lock_cancel = NEVER_SYNC_ON_CANCEL;
+
        /* last_rcvd initialization is needed by replayable targets only */
        if (!obd->obd_replayable)
                RETURN(0);
@@ -140,10 +143,21 @@ EXPORT_SYMBOL(tgt_session_key);
 
 LU_KEY_INIT_GENERIC(tgt_ses);
 
+/*
+ * this page is allocated statically when module is initializing
+ * it is used to simulate data corruptions, see ost_checksum_bulk()
+ * for details. as the original pages provided by the layers below
+ * can be remain in the internal cache, we do not want to modify
+ * them.
+ */
+struct page *tgt_page_to_corrupt;
+
 int tgt_mod_init(void)
 {
        ENTRY;
 
+       tgt_page_to_corrupt = alloc_page(GFP_IOFS);
+
        tgt_key_init_generic(&tgt_thread_key, NULL);
        lu_context_key_register_many(&tgt_thread_key, NULL);
 
@@ -155,6 +169,9 @@ int tgt_mod_init(void)
 
 void tgt_mod_exit(void)
 {
+       if (tgt_page_to_corrupt != NULL)
+               page_cache_release(tgt_page_to_corrupt);
+
        lu_context_key_degister(&tgt_thread_key);
        lu_context_key_degister(&tgt_session_key);
 }