Whamcloud - gitweb
LU-3285 mdt: IO request handling in MDT 13/28013/19
authorMikhal Pershin <mike.pershin@intel.com>
Mon, 7 Dec 2015 10:19:00 +0000 (13:19 +0300)
committerMike Pershin <mike.pershin@intel.com>
Tue, 17 Oct 2017 19:08:08 +0000 (19:08 +0000)
Add methods to handle IO requests in MDT similar to OFD.
Introduce MDS_INODEBITS_DOM bit for data on MDT, destinguish
IO requests to MDT and OST and take appropriate lock in
target code.

Change-Id: I7feaa00c381f821510ca1343b042ed5f09050ac6
Signed-off-by: Mikhal Pershin <mike.pershin@intel.com>
Reviewed-on: https://review.whamcloud.com/28013
Reviewed-by: Jinshan Xiong <jinshan.xiong@intel.com>
Reviewed-by: Bobi Jam <bobijam@hotmail.com>
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
14 files changed:
lustre/include/lu_target.h
lustre/include/lustre_osc.h
lustre/include/uapi/linux/lustre/lustre_idl.h
lustre/llite/llite_lib.c
lustre/mdc/mdc_request.c
lustre/mdt/Makefile.in
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_io.c [new file with mode: 0644]
lustre/mdt/mdt_lproc.c
lustre/mdt/mdt_mds.c
lustre/osc/osc_request.c
lustre/osd-zfs/osd_object.c
lustre/target/tgt_handler.c

index 0d3ef96..fa9ad61 100644 (file)
@@ -426,15 +426,12 @@ int tgt_sync(const struct lu_env *env, struct lu_target *tgt,
 int tgt_io_thread_init(struct ptlrpc_thread *thread);
 void tgt_io_thread_done(struct ptlrpc_thread *thread);
 
 int tgt_io_thread_init(struct ptlrpc_thread *thread);
 void tgt_io_thread_done(struct ptlrpc_thread *thread);
 
+int tgt_mdt_data_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
+                     struct lustre_handle *lh, int mode, __u64 *flags);
 int tgt_extent_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
                    __u64 start, __u64 end, struct lustre_handle *lh,
                    int mode, __u64 *flags);
 void tgt_extent_unlock(struct lustre_handle *lh, enum ldlm_mode mode);
 int tgt_extent_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
                    __u64 start, __u64 end, struct lustre_handle *lh,
                    int mode, __u64 *flags);
 void tgt_extent_unlock(struct lustre_handle *lh, enum ldlm_mode mode);
-int tgt_brw_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
-                struct obd_ioobj *obj, struct niobuf_remote *nb,
-                struct lustre_handle *lh, enum ldlm_mode mode);
-void tgt_brw_unlock(struct obd_ioobj *obj, struct niobuf_remote *niob,
-                   struct lustre_handle *lh, enum ldlm_mode mode);
 int tgt_brw_read(struct tgt_session_info *tsi);
 int tgt_brw_write(struct tgt_session_info *tsi);
 int tgt_hpreq_handler(struct ptlrpc_request *req);
 int tgt_brw_read(struct tgt_session_info *tsi);
 int tgt_brw_write(struct tgt_session_info *tsi);
 int tgt_hpreq_handler(struct ptlrpc_request *req);
index 48048af..d693db7 100644 (file)
@@ -521,6 +521,9 @@ int osc_attr_update(const struct lu_env *env, struct cl_object *obj,
 int osc_object_glimpse(const struct lu_env *env, const struct cl_object *obj,
                       struct ost_lvb *lvb);
 
 int osc_object_glimpse(const struct lu_env *env, const struct cl_object *obj,
                       struct ost_lvb *lvb);
 
+/* osc_request.c */
+void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd);
+
 /*****************************************************************************
  *
  * Accessors and type conversions.
 /*****************************************************************************
  *
  * Accessors and type conversions.
index 597bc36..8730351 100644 (file)
 #define MDC_REPLY_PORTAL               10
 //#define MDC_BULK_PORTAL              11
 #define MDS_REQUEST_PORTAL             12
 #define MDC_REPLY_PORTAL               10
 //#define MDC_BULK_PORTAL              11
 #define MDS_REQUEST_PORTAL             12
-//#define MDS_REPLY_PORTAL             13
+#define MDS_IO_PORTAL                  13
 #define MDS_BULK_PORTAL                14
 #define LDLM_CB_REQUEST_PORTAL         15
 #define LDLM_CB_REPLY_PORTAL           16
 #define MDS_BULK_PORTAL                14
 #define LDLM_CB_REQUEST_PORTAL         15
 #define LDLM_CB_REPLY_PORTAL           16
@@ -845,8 +845,9 @@ struct ptlrpc_body_v2 {
                                OBD_CONNECT_FLOCK_DEAD | \
                                OBD_CONNECT_DISP_STRIPE | OBD_CONNECT_LFSCK | \
                                OBD_CONNECT_OPEN_BY_FID | \
                                OBD_CONNECT_FLOCK_DEAD | \
                                OBD_CONNECT_DISP_STRIPE | OBD_CONNECT_LFSCK | \
                                OBD_CONNECT_OPEN_BY_FID | \
-                               OBD_CONNECT_DIR_STRIPE | \
-                               OBD_CONNECT_BULK_MBITS | \
+                               OBD_CONNECT_DIR_STRIPE | OBD_CONNECT_GRANT | \
+                               OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_SRVLOCK | \
+                               OBD_CONNECT_BULK_MBITS | OBD_CONNECT_CKSUM | \
                                OBD_CONNECT_MULTIMODRPCS | \
                                OBD_CONNECT_SUBTREE | OBD_CONNECT_LARGE_ACL | \
                                OBD_CONNECT_FLAGS2)
                                OBD_CONNECT_MULTIMODRPCS | \
                                OBD_CONNECT_SUBTREE | OBD_CONNECT_LARGE_ACL | \
                                OBD_CONNECT_FLAGS2)
@@ -1602,8 +1603,9 @@ typedef enum {
  * will grant LOOKUP_LOCK. */
 #define MDS_INODELOCK_PERM   0x000010
 #define MDS_INODELOCK_XATTR  0x000020  /* extended attributes */
  * will grant LOOKUP_LOCK. */
 #define MDS_INODELOCK_PERM   0x000010
 #define MDS_INODELOCK_XATTR  0x000020  /* extended attributes */
+#define MDS_INODELOCK_DOM    0x000040 /* Data for data-on-mdt files */
 
 
-#define MDS_INODELOCK_MAXSHIFT 5
+#define MDS_INODELOCK_MAXSHIFT 6
 /* This FULL lock is useful to take on unlink sort of operations */
 #define MDS_INODELOCK_FULL ((1<<(MDS_INODELOCK_MAXSHIFT+1))-1)
 
 /* This FULL lock is useful to take on unlink sort of operations */
 #define MDS_INODELOCK_FULL ((1<<(MDS_INODELOCK_MAXSHIFT+1))-1)
 
index 87159a0..516c4f4 100644 (file)
@@ -197,12 +197,12 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
         }
 
        /* indicate MDT features supported by this client */
         }
 
        /* indicate MDT features supported by this client */
-        data->ocd_connect_flags = OBD_CONNECT_IBITS    | OBD_CONNECT_NODEVOH  |
-                                  OBD_CONNECT_ATTRFID  |
-                                  OBD_CONNECT_VERSION  | OBD_CONNECT_BRW_SIZE |
-                                  OBD_CONNECT_MDS_CAPA | OBD_CONNECT_OSS_CAPA |
-                                  OBD_CONNECT_CANCELSET | OBD_CONNECT_FID     |
-                                  OBD_CONNECT_AT       | OBD_CONNECT_LOV_V3   |
+       data->ocd_connect_flags = OBD_CONNECT_IBITS    | OBD_CONNECT_NODEVOH  |
+                                 OBD_CONNECT_ATTRFID  | OBD_CONNECT_GRANT |
+                                 OBD_CONNECT_VERSION  | OBD_CONNECT_BRW_SIZE |
+                                 OBD_CONNECT_MDS_CAPA | OBD_CONNECT_OSS_CAPA |
+                                 OBD_CONNECT_CANCELSET | OBD_CONNECT_FID     |
+                                 OBD_CONNECT_AT       | OBD_CONNECT_LOV_V3   |
                                  OBD_CONNECT_VBR | OBD_CONNECT_FULL20 |
                                  OBD_CONNECT_64BITHASH |
                                  OBD_CONNECT_EINPROGRESS |
                                  OBD_CONNECT_VBR | OBD_CONNECT_FULL20 |
                                  OBD_CONNECT_64BITHASH |
                                  OBD_CONNECT_EINPROGRESS |
@@ -213,7 +213,7 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
                                  OBD_CONNECT_DISP_STRIPE | OBD_CONNECT_LFSCK |
                                  OBD_CONNECT_OPEN_BY_FID |
                                  OBD_CONNECT_DIR_STRIPE |
                                  OBD_CONNECT_DISP_STRIPE | OBD_CONNECT_LFSCK |
                                  OBD_CONNECT_OPEN_BY_FID |
                                  OBD_CONNECT_DIR_STRIPE |
-                                 OBD_CONNECT_BULK_MBITS |
+                                 OBD_CONNECT_BULK_MBITS | OBD_CONNECT_CKSUM |
                                  OBD_CONNECT_SUBTREE |
                                  OBD_CONNECT_FLAGS2 | OBD_CONNECT_MULTIMODRPCS;
 
                                  OBD_CONNECT_SUBTREE |
                                  OBD_CONNECT_FLAGS2 | OBD_CONNECT_MULTIMODRPCS;
 
@@ -228,6 +228,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt,
                                   OBD_CONNECT_LARGE_ACL;
 #endif
 
                                   OBD_CONNECT_LARGE_ACL;
 #endif
 
+       data->ocd_cksum_types = cksum_types_supported_client();
+
        if (OBD_FAIL_CHECK(OBD_FAIL_MDC_LIGHTWEIGHT))
                /* flag mdc connection as lightweight, only used for test
                 * purpose, use with care */
        if (OBD_FAIL_CHECK(OBD_FAIL_MDC_LIGHTWEIGHT))
                /* flag mdc connection as lightweight, only used for test
                 * purpose, use with care */
index 718d6ff..cc5c7d0 100644 (file)
@@ -56,6 +56,7 @@
 #include <uapi/linux/lustre/lustre_param.h>
 #include <lustre_swab.h>
 #include <obd_class.h>
 #include <uapi/linux/lustre/lustre_param.h>
 #include <lustre_swab.h>
 #include <obd_class.h>
+#include <lustre_osc.h>
 
 #include "mdc_internal.h"
 
 
 #include "mdc_internal.h"
 
@@ -2354,7 +2355,15 @@ static int mdc_import_event(struct obd_device *obd, struct obd_import *imp,
        LASSERT(imp->imp_obd == obd);
 
        switch (event) {
        LASSERT(imp->imp_obd == obd);
 
        switch (event) {
+       case IMP_EVENT_DISCON: {
+               struct client_obd *cli = &obd->u.cli;
 
 
+               spin_lock(&cli->cl_loi_list_lock);
+               cli->cl_avail_grant = 0;
+               cli->cl_lost_grant = 0;
+               spin_unlock(&cli->cl_loi_list_lock);
+               break;
+       }
        case IMP_EVENT_INACTIVE: {
                struct client_obd *cli = &obd->u.cli;
                /*
        case IMP_EVENT_INACTIVE: {
                struct client_obd *cli = &obd->u.cli;
                /*
@@ -2382,10 +2391,15 @@ static int mdc_import_event(struct obd_device *obd, struct obd_import *imp,
                if (rc == 0)
                        rc = mdc_kuc_reregister(imp);
                break;
                if (rc == 0)
                        rc = mdc_kuc_reregister(imp);
                break;
-       case IMP_EVENT_OCD:
+       case IMP_EVENT_OCD: {
+               struct obd_connect_data *ocd = &imp->imp_connect_data;
+
+               if (OCD_HAS_FLAG(ocd, GRANT))
+                       osc_init_grant(&obd->u.cli, ocd);
+
                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
                break;
                rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
                break;
-       case IMP_EVENT_DISCON:
+       }
        case IMP_EVENT_DEACTIVATE:
        case IMP_EVENT_ACTIVATE:
                break;
        case IMP_EVENT_DEACTIVATE:
        case IMP_EVENT_ACTIVATE:
                break;
index 559b9f6..0165cfe 100644 (file)
@@ -1,7 +1,7 @@
 MODULES := mdt
 mdt-objs := mdt_handler.o mdt_lib.o mdt_reint.o mdt_xattr.o mdt_recovery.o
 mdt-objs += mdt_open.o mdt_identity.o mdt_lproc.o mdt_fs.o
 MODULES := mdt
 mdt-objs := mdt_handler.o mdt_lib.o mdt_reint.o mdt_xattr.o mdt_recovery.o
 mdt-objs += mdt_open.o mdt_identity.o mdt_lproc.o mdt_fs.o
-mdt-objs += mdt_lvb.o mdt_hsm.o mdt_mds.o
+mdt-objs += mdt_lvb.o mdt_hsm.o mdt_mds.o mdt_io.o
 mdt-objs += mdt_hsm_cdt_actions.o
 mdt-objs += mdt_hsm_cdt_requests.o
 mdt-objs += mdt_hsm_cdt_client.o
 mdt-objs += mdt_hsm_cdt_actions.o
 mdt-objs += mdt_hsm_cdt_requests.o
 mdt-objs += mdt_hsm_cdt_client.o
index dfaa281..4916d40 100644 (file)
@@ -61,7 +61,7 @@
 #include <obd.h>
 #include <obd_support.h>
 #include <lustre_barrier.h>
 #include <obd.h>
 #include <obd_support.h>
 #include <lustre_barrier.h>
-
+#include <obd_cksum.h>
 #include <llog_swab.h>
 
 #include "mdt_internal.h"
 #include <llog_swab.h>
 
 #include "mdt_internal.h"
@@ -665,17 +665,21 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
                /* if no object is allocated on osts, the size on mds is valid.
                 * b=22272 */
                b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
                /* if no object is allocated on osts, the size on mds is valid.
                 * b=22272 */
                b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
-       } else if ((ma->ma_valid & MA_LOV) && ma->ma_lmm != NULL &&
-                  mdt_hsm_is_released(ma->ma_lmm)) {
-               /* A released file stores its size on MDS. */
-               /* But return 1 block for released file, unless tools like tar
-                * will consider it fully sparse. (LU-3864)
-                */
-               if (unlikely(b->mbo_size == 0))
-                       b->mbo_blocks = 0;
-               else
-                       b->mbo_blocks = 1;
-               b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+       } else if ((ma->ma_valid & MA_LOV) && ma->ma_lmm != NULL) {
+               if (mdt_hsm_is_released(ma->ma_lmm)) {
+                       /* A released file stores its size on MDS. */
+                       /* But return 1 block for released file, unless tools
+                        * like tar will consider it fully sparse. (LU-3864)
+                        */
+                       if (unlikely(b->mbo_size == 0))
+                               b->mbo_blocks = 0;
+                       else
+                               b->mbo_blocks = 1;
+                       b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+               } else if (lov_pattern(ma->ma_lmm->lmm_pattern) ==
+                          LOV_PATTERN_MDT) {
+                       b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+               }
        }
 
        if (fid != NULL && (b->mbo_valid & OBD_MD_FLSIZE))
        }
 
        if (fid != NULL && (b->mbo_valid & OBD_MD_FLSIZE))
@@ -2082,20 +2086,21 @@ static int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt)
 }
 
 /* this should sync this object */
 }
 
 /* this should sync this object */
-static int mdt_object_sync(struct mdt_thread_info *info)
+static int mdt_object_sync(const struct lu_env *env, struct obd_export *exp,
+                          struct mdt_object *mo)
 {
 {
-       struct md_object *next;
        int rc;
        int rc;
+
        ENTRY;
 
        ENTRY;
 
-       if (!mdt_object_exists(info->mti_object)) {
+       if (!mdt_object_exists(mo)) {
                CWARN("%s: non existing object "DFID": rc = %d\n",
                CWARN("%s: non existing object "DFID": rc = %d\n",
-                     mdt_obd_name(info->mti_mdt),
-                     PFID(mdt_object_fid(info->mti_object)), -ESTALE);
+                     exp->exp_obd->obd_name, PFID(mdt_object_fid(mo)),
+                     -ESTALE);
                RETURN(-ESTALE);
        }
                RETURN(-ESTALE);
        }
-       next = mdt_object_child(info->mti_object);
-       rc = mo_object_sync(info->mti_env, next);
+
+       rc = mo_object_sync(env, mdt_object_child(mo));
 
        RETURN(rc);
 }
 
        RETURN(rc);
 }
@@ -2118,7 +2123,8 @@ static int mdt_sync(struct tgt_session_info *tsi)
                struct mdt_thread_info *info = tsi2mdt_info(tsi);
 
                /* sync an object */
                struct mdt_thread_info *info = tsi2mdt_info(tsi);
 
                /* sync an object */
-               rc = mdt_object_sync(info);
+               rc = mdt_object_sync(tsi->tsi_env, tsi->tsi_exp,
+                                    info->mti_object);
                if (rc == 0) {
                        const struct lu_fid *fid;
                        struct lu_attr *la = &info->mti_attr.ma_attr;
                if (rc == 0) {
                        const struct lu_fid *fid;
                        struct lu_attr *la = &info->mti_attr.ma_attr;
@@ -2142,6 +2148,54 @@ static int mdt_sync(struct tgt_session_info *tsi)
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
+static int mdt_data_sync(struct tgt_session_info *tsi)
+{
+       struct mdt_thread_info *info;
+       struct mdt_device *mdt = mdt_exp2dev(tsi->tsi_exp);
+       struct ost_body *body = tsi->tsi_ost_body;
+       struct ost_body *repbody;
+       struct mdt_object *mo = NULL;
+       struct md_attr *ma;
+       int rc = 0;
+
+       ENTRY;
+
+       repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
+
+       /* if no fid is specified then do nothing,
+        * device sync is done via MDS_SYNC */
+       if (fid_is_zero(&tsi->tsi_fid))
+               RETURN(0);
+
+       mo = mdt_object_find(tsi->tsi_env, mdt, &tsi->tsi_fid);
+       if (IS_ERR(mo))
+               RETURN(PTR_ERR(mo));
+
+       rc = mdt_object_sync(tsi->tsi_env, tsi->tsi_exp, mo);
+       if (rc)
+               GOTO(put, rc);
+
+       repbody->oa.o_oi = body->oa.o_oi;
+       repbody->oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+
+       info = tsi2mdt_info(tsi);
+       ma = &info->mti_attr;
+       ma->ma_need = MA_INODE;
+       ma->ma_valid = 0;
+       rc = mdt_attr_get_complex(info, mo, ma);
+       if (rc == 0)
+               obdo_from_la(&repbody->oa, &ma->ma_attr, VALID_FLAGS);
+       else
+               rc = 0;
+       mdt_thread_info_fini(info);
+
+       EXIT;
+put:
+       if (mo != NULL)
+               mdt_object_put(tsi->tsi_env, mo);
+       return rc;
+}
+
 /*
  * Handle quota control requests to consult current usage/limit, but also
  * to configure quota enforcement
 /*
  * Handle quota control requests to consult current usage/limit, but also
  * to configure quota enforcement
@@ -2865,8 +2919,8 @@ int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o,
  * \param mode lock mode
  * \param decref force immediate lock releasing
  */
  * \param mode lock mode
  * \param decref force immediate lock releasing
  */
-static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
-                         enum ldlm_mode mode, int decref)
+void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
+                  enum ldlm_mode mode, int decref)
 {
        ENTRY;
 
 {
        ENTRY;
 
@@ -4631,6 +4685,11 @@ static int mdt_tgt_getxattr(struct tgt_session_info *tsi)
        return rc;
 }
 
        return rc;
 }
 
+#define OBD_FAIL_OST_READ_NET  OBD_FAIL_OST_BRW_NET
+#define OBD_FAIL_OST_WRITE_NET OBD_FAIL_OST_BRW_NET
+#define OST_BRW_READ   OST_READ
+#define OST_BRW_WRITE  OST_WRITE
+
 static struct tgt_handler mdt_tgt_handlers[] = {
 TGT_RPC_HANDLER(MDS_FIRST_OPC,
                0,                      MDS_CONNECT,    mdt_tgt_connect,
 static struct tgt_handler mdt_tgt_handlers[] = {
 TGT_RPC_HANDLER(MDS_FIRST_OPC,
                0,                      MDS_CONNECT,    mdt_tgt_connect,
@@ -4671,6 +4730,14 @@ TGT_MDT_HDL(HABEO_CLAVIS | HABEO_CORPUS | HABEO_REFERO | MUTABOR,
            mdt_swap_layouts),
 };
 
            mdt_swap_layouts),
 };
 
+static struct tgt_handler mdt_io_ops[] = {
+TGT_OST_HDL(HABEO_CORPUS | HABEO_REFERO, OST_BRW_READ, tgt_brw_read),
+TGT_OST_HDL(HABEO_CORPUS | MUTABOR,     OST_BRW_WRITE, tgt_brw_write),
+TGT_OST_HDL(HABEO_CORPUS | HABEO_REFERO | MUTABOR,
+                                        OST_PUNCH,     mdt_punch_hdl),
+TGT_OST_HDL(HABEO_CORPUS | HABEO_REFERO, OST_SYNC,     mdt_data_sync),
+};
+
 static struct tgt_handler mdt_sec_ctx_ops[] = {
 TGT_SEC_HDL_VAR(0,                     SEC_CTX_INIT,     mdt_sec_ctx_handle),
 TGT_SEC_HDL_VAR(0,                     SEC_CTX_INIT_CONT,mdt_sec_ctx_handle),
 static struct tgt_handler mdt_sec_ctx_ops[] = {
 TGT_SEC_HDL_VAR(0,                     SEC_CTX_INIT,     mdt_sec_ctx_handle),
 TGT_SEC_HDL_VAR(0,                     SEC_CTX_INIT_CONT,mdt_sec_ctx_handle),
@@ -4732,7 +4799,11 @@ static struct tgt_opc_slice mdt_common_slice[] = {
                .tos_opc_end    = LFSCK_LAST_OPC,
                .tos_hs         = tgt_lfsck_handlers
        },
                .tos_opc_end    = LFSCK_LAST_OPC,
                .tos_hs         = tgt_lfsck_handlers
        },
-
+       {
+               .tos_opc_start  = OST_FIRST_OPC,
+               .tos_opc_end    = OST_LAST_OPC,
+               .tos_hs         = mdt_io_ops
+       },
        {
                .tos_hs         = NULL
        }
        {
                .tos_hs         = NULL
        }
@@ -5157,6 +5228,7 @@ static struct lu_object *mdt_object_alloc(const struct lu_env *env,
                o->lo_ops = &mdt_obj_ops;
                spin_lock_init(&mo->mot_write_lock);
                mutex_init(&mo->mot_lov_mutex);
                o->lo_ops = &mdt_obj_ops;
                spin_lock_init(&mo->mot_write_lock);
                mutex_init(&mo->mot_lov_mutex);
+               init_rwsem(&mo->mot_dom_sem);
                init_rwsem(&mo->mot_open_sem);
                atomic_set(&mo->mot_open_count, 0);
                RETURN(o);
                init_rwsem(&mo->mot_open_sem);
                atomic_set(&mo->mot_open_count, 0);
                RETURN(o);
@@ -5325,9 +5397,10 @@ static int mdt_obd_set_info_async(const struct lu_env *env,
  * \retval -EPROTO \a data unexpectedly has zero obd_connect_data::ocd_brw_size
  * \retval -EBADE  client and server feature requirements are incompatible
  */
  * \retval -EPROTO \a data unexpectedly has zero obd_connect_data::ocd_brw_size
  * \retval -EBADE  client and server feature requirements are incompatible
  */
-static int mdt_connect_internal(struct obd_export *exp,
+static int mdt_connect_internal(const struct lu_env *env,
+                               struct obd_export *exp,
                                struct mdt_device *mdt,
                                struct mdt_device *mdt,
-                               struct obd_connect_data *data)
+                               struct obd_connect_data *data, bool reconnect)
 {
        LASSERT(data != NULL);
 
 {
        LASSERT(data != NULL);
 
@@ -5373,6 +5446,10 @@ static int mdt_connect_internal(struct obd_export *exp,
                }
        }
 
                }
        }
 
+       if (OCD_HAS_FLAG(data, GRANT))
+               data->ocd_grant = mdt_grant_connect(env, exp, data->ocd_grant,
+                                                   !reconnect);
+
        /* NB: Disregard the rule against updating
         * exp_connect_data.ocd_connect_flags in this case, since
         * tgt_client_new() needs to know if this is a lightweight
        /* NB: Disregard the rule against updating
         * exp_connect_data.ocd_connect_flags in this case, since
         * tgt_client_new() needs to know if this is a lightweight
@@ -5416,6 +5493,32 @@ static int mdt_connect_internal(struct obd_export *exp,
                spin_unlock(&exp->exp_lock);
        }
 
                spin_unlock(&exp->exp_lock);
        }
 
+       if (OCD_HAS_FLAG(data, CKSUM)) {
+               __u32 cksum_types = data->ocd_cksum_types;
+
+               /* The client set in ocd_cksum_types the checksum types it
+                * supports. We have to mask off the algorithms that we don't
+                * support */
+               data->ocd_cksum_types &= cksum_types_supported_server();
+
+               if (unlikely(data->ocd_cksum_types == 0)) {
+                       CERROR("%s: Connect with checksum support but no "
+                              "ocd_cksum_types is set\n",
+                              exp->exp_obd->obd_name);
+                       RETURN(-EPROTO);
+               }
+
+               CDEBUG(D_RPCTRACE, "%s: cli %s supports cksum type %x, return "
+                      "%x\n", exp->exp_obd->obd_name, obd_export_nid2str(exp),
+                      cksum_types, data->ocd_cksum_types);
+       } else {
+               /* This client does not support OBD_CONNECT_CKSUM
+                * fall back to CRC32 */
+               CDEBUG(D_RPCTRACE, "%s: cli %s does not support "
+                      "OBD_CONNECT_CKSUM, CRC32 will be used\n",
+                      exp->exp_obd->obd_name, obd_export_nid2str(exp));
+       }
+
        return 0;
 }
 
        return 0;
 }
 
@@ -5619,7 +5722,7 @@ static int mdt_obd_connect(const struct lu_env *env,
        if (rc != 0 && rc != -EEXIST)
                GOTO(out, rc);
 
        if (rc != 0 && rc != -EEXIST)
                GOTO(out, rc);
 
-       rc = mdt_connect_internal(lexp, mdt, data);
+       rc = mdt_connect_internal(env, lexp, mdt, data, false);
        if (rc == 0) {
                struct lsd_client_data *lcd = lexp->exp_target_data.ted_lcd;
 
        if (rc == 0) {
                struct lsd_client_data *lcd = lexp->exp_target_data.ted_lcd;
 
@@ -5665,7 +5768,8 @@ static int mdt_obd_reconnect(const struct lu_env *env,
        if (rc != 0 && rc != -EEXIST)
                RETURN(rc);
 
        if (rc != 0 && rc != -EEXIST)
                RETURN(rc);
 
-       rc = mdt_connect_internal(exp, mdt_dev(obd->obd_lu_dev), data);
+       rc = mdt_connect_internal(env, exp, mdt_dev(obd->obd_lu_dev), data,
+                                 true);
        if (rc == 0)
                mdt_export_stats_init(obd, exp, localdata);
        else
        if (rc == 0)
                mdt_export_stats_init(obd, exp, localdata);
        else
@@ -6292,6 +6396,9 @@ static struct obd_ops mdt_obd_device_ops = {
         .o_destroy_export = mdt_destroy_export,
         .o_iocontrol      = mdt_iocontrol,
         .o_postrecov      = mdt_obd_postrecov,
         .o_destroy_export = mdt_destroy_export,
         .o_iocontrol      = mdt_iocontrol,
         .o_postrecov      = mdt_obd_postrecov,
+       /* Data-on-MDT IO methods */
+       .o_preprw         = mdt_obd_preprw,
+       .o_commitrw       = mdt_obd_commitrw,
 };
 
 static struct lu_device* mdt_device_fini(const struct lu_env *env,
 };
 
 static struct lu_device* mdt_device_fini(const struct lu_env *env,
index 23c993c..bb8e2df 100644 (file)
@@ -272,6 +272,8 @@ struct mdt_object {
        spinlock_t              mot_write_lock;
         /* Lock to protect create_data */
        struct mutex            mot_lov_mutex;
        spinlock_t              mot_write_lock;
         /* Lock to protect create_data */
        struct mutex            mot_lov_mutex;
+       /* lock to protect read/write stages for Data-on-MDT files */
+       struct rw_semaphore     mot_dom_sem;
        /* Lock to protect lease open.
         * Lease open acquires write lock; normal open acquires read lock */
        struct rw_semaphore     mot_open_sem;
        /* Lock to protect lease open.
         * Lease open acquires write lock; normal open acquires read lock */
        struct rw_semaphore     mot_open_sem;
@@ -643,6 +645,8 @@ int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *mo,
 
 void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *mo,
                       struct mdt_lock_handle *lh, int decref);
 
 void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *mo,
                       struct mdt_lock_handle *lh, int decref);
+void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
+                  enum ldlm_mode mode, int decref);
 
 struct mdt_object *mdt_object_new(const struct lu_env *env,
                                  struct mdt_device *,
 
 struct mdt_object *mdt_object_new(const struct lu_env *env,
                                  struct mdt_device *,
@@ -1074,9 +1078,12 @@ enum {
         LPROC_MDT_SETXATTR,
         LPROC_MDT_STATFS,
         LPROC_MDT_SYNC,
         LPROC_MDT_SETXATTR,
         LPROC_MDT_STATFS,
         LPROC_MDT_SYNC,
-        LPROC_MDT_SAMEDIR_RENAME,
-        LPROC_MDT_CROSSDIR_RENAME,
-        LPROC_MDT_LAST,
+       LPROC_MDT_SAMEDIR_RENAME,
+       LPROC_MDT_CROSSDIR_RENAME,
+       LPROC_MDT_IO_READ,
+       LPROC_MDT_IO_WRITE,
+       LPROC_MDT_IO_PUNCH,
+       LPROC_MDT_LAST,
 };
 void mdt_counter_incr(struct ptlrpc_request *req, int opcode);
 void mdt_stats_counter_init(struct lprocfs_stats *stats);
 };
 void mdt_counter_incr(struct ptlrpc_request *req, int opcode);
 void mdt_stats_counter_init(struct lprocfs_stats *stats);
@@ -1117,4 +1124,24 @@ static inline char *mdt_req_get_jobid(struct ptlrpc_request *req)
        return jobid;
 }
 
        return jobid;
 }
 
+/* MDT IO */
+
+#define VALID_FLAGS (LA_TYPE | LA_MODE | LA_SIZE | LA_BLOCKS | \
+                    LA_BLKSIZE | LA_ATIME | LA_MTIME | LA_CTIME)
+
+int mdt_obd_preprw(const struct lu_env *env, int cmd, struct obd_export *exp,
+                  struct obdo *oa, int objcount, struct obd_ioobj *obj,
+                  struct niobuf_remote *rnb, int *nr_local,
+                  struct niobuf_local *lnb);
+
+int mdt_obd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
+                    struct obdo *oa, int objcount, struct obd_ioobj *obj,
+                    struct niobuf_remote *rnb, int npages,
+                    struct niobuf_local *lnb, int old_rc);
+int mdt_punch_hdl(struct tgt_session_info *tsi);
+
+/* grants */
+long mdt_grant_connect(const struct lu_env *env, struct obd_export *exp,
+                      u64 want, bool conservative);
+
 #endif /* _MDT_INTERNAL_H */
 #endif /* _MDT_INTERNAL_H */
diff --git a/lustre/mdt/mdt_io.c b/lustre/mdt/mdt_io.c
new file mode 100644 (file)
index 0000000..954713e
--- /dev/null
@@ -0,0 +1,614 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2012, 2017 Intel Corporation.
+ */
+/*
+ * lustre/mdt/mdt_io.c
+ *
+ * Author: Mikhail Pershin <mike.pershin@intel.com>
+ */
+
+#define DEBUG_SUBSYSTEM S_FILTER
+
+#include <dt_object.h>
+#include "mdt_internal.h"
+
+/* --------------- MDT grant code ---------------- */
+
+long mdt_grant_connect(const struct lu_env *env,
+                      struct obd_export *exp,
+                      u64 want, bool conservative)
+{
+       struct mdt_device *mdt = mdt_exp2dev(exp);
+       u64 left;
+       long grant;
+
+       ENTRY;
+
+       dt_statfs(env, mdt->mdt_bottom, &mdt->mdt_osfs);
+
+       left = (mdt->mdt_osfs.os_bavail * mdt->mdt_osfs.os_bsize) / 2;
+
+       grant = left;
+
+       CDEBUG(D_CACHE, "%s: cli %s/%p ocd_grant: %ld want: %llu left: %llu\n",
+              exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
+              exp, grant, want, left);
+
+       return grant;
+}
+
+void mdt_grant_prepare_write(const struct lu_env *env,
+                            struct obd_export *exp, struct obdo *oa,
+                            struct niobuf_remote *rnb, int niocount)
+{
+       struct mdt_device *mdt = mdt_exp2dev(exp);
+       u64 left;
+
+       ENTRY;
+
+       left = (mdt->mdt_osfs.os_bavail * mdt->mdt_osfs.os_bsize) / 2;
+
+       /* grant more space back to the client if possible */
+       oa->o_grant = left;
+}
+/* ---------------- end of MDT grant code ---------------- */
+
+/* functions below are stubs for now, they will be implemented with
+ * grant support on MDT */
+static inline void mdt_io_counter_incr(struct obd_export *exp, int opcode,
+                                      char *jobid, long amount)
+{
+       return;
+}
+
+void mdt_grant_prepare_read(const struct lu_env *env,
+                           struct obd_export *exp, struct obdo *oa)
+{
+       return;
+}
+
+void mdt_grant_commit(struct obd_export *exp, unsigned long pending,
+                     int rc)
+{
+       return;
+
+}
+
+static inline void mdt_dom_read_lock(struct mdt_object *mo)
+{
+       down_read(&mo->mot_dom_sem);
+}
+
+static inline void mdt_dom_read_unlock(struct mdt_object *mo)
+{
+       up_read(&mo->mot_dom_sem);
+}
+
+static inline void mdt_dom_write_lock(struct mdt_object *mo)
+{
+       down_write(&mo->mot_dom_sem);
+}
+
+static inline void mdt_dom_write_unlock(struct mdt_object *mo)
+{
+       up_write(&mo->mot_dom_sem);
+}
+
+static int mdt_preprw_read(const struct lu_env *env, struct obd_export *exp,
+                          struct mdt_device *mdt, struct mdt_object *mo,
+                          struct lu_attr *la, int niocount,
+                          struct niobuf_remote *rnb, int *nr_local,
+                          struct niobuf_local *lnb, char *jobid)
+{
+       struct dt_object *dob;
+       int i, j, rc, tot_bytes = 0;
+
+       ENTRY;
+
+       mdt_dom_read_lock(mo);
+       if (!mdt_object_exists(mo))
+               GOTO(unlock, rc = -ENOENT);
+
+       dob = mdt_obj2dt(mo);
+       /* parse remote buffers to local buffers and prepare the latter */
+       *nr_local = 0;
+       for (i = 0, j = 0; i < niocount; i++) {
+               rc = dt_bufs_get(env, dob, rnb + i, lnb + j, 0);
+               if (unlikely(rc < 0))
+                       GOTO(buf_put, rc);
+               /* correct index for local buffers to continue with */
+               j += rc;
+               *nr_local += rc;
+               tot_bytes += rnb[i].rnb_len;
+       }
+
+       rc = dt_attr_get(env, dob, la);
+       if (unlikely(rc))
+               GOTO(buf_put, rc);
+
+       rc = dt_read_prep(env, dob, lnb, *nr_local);
+       if (unlikely(rc))
+               GOTO(buf_put, rc);
+
+       mdt_io_counter_incr(exp, LPROC_MDT_IO_READ, jobid, tot_bytes);
+       RETURN(0);
+buf_put:
+       dt_bufs_put(env, dob, lnb, *nr_local);
+unlock:
+       mdt_dom_read_unlock(mo);
+       return rc;
+}
+
+static int mdt_preprw_write(const struct lu_env *env, struct obd_export *exp,
+                           struct mdt_device *mdt, struct mdt_object *mo,
+                           struct lu_attr *la, struct obdo *oa,
+                           int objcount, struct obd_ioobj *obj,
+                           struct niobuf_remote *rnb, int *nr_local,
+                           struct niobuf_local *lnb, char *jobid)
+{
+       struct dt_object *dob;
+       int i, j, k, rc = 0, tot_bytes = 0;
+
+       ENTRY;
+
+       /* Process incoming grant info, set OBD_BRW_GRANTED flag and grant some
+        * space back if possible */
+       mdt_grant_prepare_write(env, exp, oa, rnb, obj->ioo_bufcnt);
+
+       mdt_dom_read_lock(mo);
+       if (!mdt_object_exists(mo)) {
+               CDEBUG(D_ERROR, "%s: BRW to missing obj "DFID"\n",
+                      exp->exp_obd->obd_name, PFID(mdt_object_fid(mo)));
+               GOTO(unlock, rc = -ENOENT);
+       }
+
+       dob = mdt_obj2dt(mo);
+       /* parse remote buffers to local buffers and prepare the latter */
+       *nr_local = 0;
+       for (i = 0, j = 0; i < obj->ioo_bufcnt; i++) {
+               rc = dt_bufs_get(env, dob, rnb + i, lnb + j, 1);
+               if (unlikely(rc < 0))
+                       GOTO(err, rc);
+               /* correct index for local buffers to continue with */
+               for (k = 0; k < rc; k++)
+                       lnb[j+k].lnb_flags = rnb[i].rnb_flags;
+               j += rc;
+               *nr_local += rc;
+               tot_bytes += rnb[i].rnb_len;
+       }
+
+       rc = dt_write_prep(env, dob, lnb, *nr_local);
+       if (likely(rc))
+               GOTO(err, rc);
+
+       mdt_io_counter_incr(exp, LPROC_MDT_IO_WRITE, jobid, tot_bytes);
+       RETURN(0);
+err:
+       dt_bufs_put(env, dob, lnb, *nr_local);
+unlock:
+       mdt_dom_read_unlock(mo);
+       /* tgt_grant_prepare_write() was called, so we must commit */
+       mdt_grant_commit(exp, oa->o_grant_used, rc);
+       /* let's still process incoming grant information packed in the oa,
+        * but without enforcing grant since we won't proceed with the write.
+        * Just like a read request actually. */
+       mdt_grant_prepare_read(env, exp, oa);
+       return rc;
+}
+
+int mdt_obd_preprw(const struct lu_env *env, int cmd, struct obd_export *exp,
+                  struct obdo *oa, int objcount, struct obd_ioobj *obj,
+                  struct niobuf_remote *rnb, int *nr_local,
+                  struct niobuf_local *lnb)
+{
+       struct tgt_session_info *tsi = tgt_ses_info(env);
+       struct mdt_thread_info *info = tsi2mdt_info(tsi);
+       struct lu_attr *la = &info->mti_attr.ma_attr;
+       struct mdt_device *mdt = mdt_dev(exp->exp_obd->obd_lu_dev);
+       struct mdt_object *mo;
+       char *jobid;
+       int rc = 0;
+
+       /* The default value PTLRPC_MAX_BRW_PAGES is set in tgt_brw_write()
+        * but for MDT it is different, correct it here. */
+       if (*nr_local > MD_MAX_BRW_PAGES)
+               *nr_local = MD_MAX_BRW_PAGES;
+
+       jobid = tsi->tsi_jobid;
+
+       if (!oa || objcount != 1 || obj->ioo_bufcnt == 0) {
+               CERROR("%s: bad parameters %p/%i/%i\n",
+                      exp->exp_obd->obd_name, oa, objcount, obj->ioo_bufcnt);
+               rc = -EPROTO;
+       }
+
+       mo = mdt_object_find(env, mdt, &tsi->tsi_fid);
+       if (IS_ERR(mo))
+               GOTO(out, rc = PTR_ERR(mo));
+
+       LASSERT(info->mti_object == NULL);
+       info->mti_object = mo;
+
+       if (cmd == OBD_BRW_WRITE) {
+               la_from_obdo(la, oa, OBD_MD_FLGETATTR);
+               rc = mdt_preprw_write(env, exp, mdt, mo, la, oa,
+                                     objcount, obj, rnb, nr_local, lnb,
+                                     jobid);
+       } else if (cmd == OBD_BRW_READ) {
+               mdt_grant_prepare_read(env, exp, oa);
+               rc = mdt_preprw_read(env, exp, mdt, mo, la,
+                                    obj->ioo_bufcnt, rnb, nr_local, lnb,
+                                    jobid);
+               obdo_from_la(oa, la, LA_ATIME);
+       } else {
+               CERROR("%s: wrong cmd %d received!\n",
+                      exp->exp_obd->obd_name, cmd);
+               rc = -EPROTO;
+       }
+       if (rc) {
+               lu_object_put(env, &mo->mot_obj);
+               info->mti_object = NULL;
+       }
+out:
+       RETURN(rc);
+}
+
+static int mdt_commitrw_read(const struct lu_env *env, struct mdt_device *mdt,
+                            struct mdt_object *mo, int objcount, int niocount,
+                            struct niobuf_local *lnb)
+{
+       struct dt_object *dob;
+       int rc = 0;
+
+       ENTRY;
+
+       LASSERT(niocount > 0);
+
+       dob = mdt_obj2dt(mo);
+
+       dt_bufs_put(env, dob, lnb, niocount);
+
+       mdt_dom_read_unlock(mo);
+       RETURN(rc);
+}
+
+static int mdt_commitrw_write(const struct lu_env *env, struct obd_export *exp,
+                             struct mdt_device *mdt, struct mdt_object *mo,
+                             struct lu_attr *la, int objcount, int niocount,
+                             struct niobuf_local *lnb, unsigned long granted,
+                             int old_rc)
+{
+       struct dt_device *dt = mdt->mdt_bottom;
+       struct dt_object *dob;
+       struct thandle *th;
+       int rc = 0;
+       int retries = 0;
+       int i;
+
+       ENTRY;
+
+       dob = mdt_obj2dt(mo);
+
+       if (old_rc)
+               GOTO(out, rc = old_rc);
+
+       la->la_valid &= LA_ATIME | LA_MTIME | LA_CTIME;
+retry:
+       if (!dt_object_exists(dob))
+               GOTO(out, rc = -ENOENT);
+
+       th = dt_trans_create(env, dt);
+       if (IS_ERR(th))
+               GOTO(out, rc = PTR_ERR(th));
+
+       for (i = 0; i < niocount; i++) {
+               if (!(lnb[i].lnb_flags & OBD_BRW_ASYNC)) {
+                       th->th_sync = 1;
+                       break;
+               }
+       }
+
+       if (OBD_FAIL_CHECK(OBD_FAIL_OST_DQACQ_NET))
+               GOTO(out_stop, rc = -EINPROGRESS);
+
+       rc = dt_declare_write_commit(env, dob, lnb, niocount, th);
+       if (rc)
+               GOTO(out_stop, rc);
+
+       if (la->la_valid) {
+               /* update [mac]time if needed */
+               rc = dt_declare_attr_set(env, dob, la, th);
+               if (rc)
+                       GOTO(out_stop, rc);
+       }
+
+       rc = dt_trans_start(env, dt, th);
+       if (rc)
+               GOTO(out_stop, rc);
+
+       dt_write_lock(env, dob, 0);
+       rc = dt_write_commit(env, dob, lnb, niocount, th);
+       if (rc)
+               GOTO(unlock, rc);
+
+       if (la->la_valid) {
+               rc = dt_attr_set(env, dob, la, th);
+               if (rc)
+                       GOTO(unlock, rc);
+       }
+       /* get attr to return */
+       rc = dt_attr_get(env, dob, la);
+unlock:
+       dt_write_unlock(env, dob);
+
+out_stop:
+       /* Force commit to make the just-deleted blocks
+        * reusable. LU-456 */
+       if (rc == -ENOSPC)
+               th->th_sync = 1;
+
+       th->th_result = rc;
+       dt_trans_stop(env, dt, th);
+       if (rc == -ENOSPC && retries++ < 3) {
+               CDEBUG(D_INODE, "retry after force commit, retries:%d\n",
+                      retries);
+               goto retry;
+       }
+
+out:
+       dt_bufs_put(env, dob, lnb, niocount);
+       mdt_dom_read_unlock(mo);
+       mdt_grant_commit(exp, granted, old_rc);
+       RETURN(rc);
+}
+
+int mdt_obd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
+                    struct obdo *oa, int objcount, struct obd_ioobj *obj,
+                    struct niobuf_remote *rnb, int npages,
+                    struct niobuf_local *lnb, int old_rc)
+{
+       struct mdt_thread_info *info = mdt_th_info(env);
+       struct mdt_device *mdt = mdt_dev(exp->exp_obd->obd_lu_dev);
+       struct mdt_object *mo = info->mti_object;
+       struct lu_attr *la = &info->mti_attr.ma_attr;
+       __u64 valid;
+       int rc = 0;
+
+       if (npages == 0) {
+               CERROR("%s: no pages to commit\n",
+                      exp->exp_obd->obd_name);
+               rc = -EPROTO;
+       }
+
+       LASSERT(mo);
+
+       if (cmd == OBD_BRW_WRITE) {
+               /* Don't update timestamps if this write is older than a
+                * setattr which modifies the timestamps. b=10150 */
+
+               /* XXX when we start having persistent reservations this needs
+                * to be changed to ofd_fmd_get() to create the fmd if it
+                * doesn't already exist so we can store the reservation handle
+                * there. */
+               valid = OBD_MD_FLUID | OBD_MD_FLGID;
+               valid |= OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME;
+
+               la_from_obdo(la, oa, valid);
+
+               rc = mdt_commitrw_write(env, exp, mdt, mo, la, objcount,
+                                       npages, lnb, oa->o_grant_used, old_rc);
+               if (rc == 0)
+                       obdo_from_la(oa, la, VALID_FLAGS | LA_GID | LA_UID);
+               else
+                       obdo_from_la(oa, la, LA_GID | LA_UID);
+
+               /* don't report overquota flag if we failed before reaching
+                * commit */
+               if (old_rc == 0 && (rc == 0 || rc == -EDQUOT)) {
+                       /* return the overquota flags to client */
+                       if (lnb[0].lnb_flags & OBD_BRW_OVER_USRQUOTA) {
+                               if (oa->o_valid & OBD_MD_FLFLAGS)
+                                       oa->o_flags |= OBD_FL_NO_USRQUOTA;
+                               else
+                                       oa->o_flags = OBD_FL_NO_USRQUOTA;
+                       }
+
+                       if (lnb[0].lnb_flags & OBD_BRW_OVER_GRPQUOTA) {
+                               if (oa->o_valid & OBD_MD_FLFLAGS)
+                                       oa->o_flags |= OBD_FL_NO_GRPQUOTA;
+                               else
+                                       oa->o_flags = OBD_FL_NO_GRPQUOTA;
+                       }
+
+                       oa->o_valid |= OBD_MD_FLFLAGS | OBD_MD_FLUSRQUOTA |
+                                      OBD_MD_FLGRPQUOTA;
+               }
+       } else if (cmd == OBD_BRW_READ) {
+               rc = mdt_commitrw_read(env, mdt, mo, objcount, npages, lnb);
+               if (old_rc)
+                       rc = old_rc;
+       } else {
+               rc = -EPROTO;
+       }
+       /* this put is pair to object_get in ofd_preprw_write */
+       mdt_thread_info_fini(info);
+       RETURN(rc);
+}
+
+int mdt_object_punch(const struct lu_env *env, struct dt_device *dt,
+                    struct dt_object *dob, __u64 start, __u64 end,
+                    struct lu_attr *la)
+{
+       struct thandle *th;
+       int rc;
+
+       ENTRY;
+
+       /* we support truncate, not punch yet */
+       LASSERT(end == OBD_OBJECT_EOF);
+
+       if (!dt_object_exists(dob))
+               RETURN(-ENOENT);
+
+       th = dt_trans_create(env, dt);
+       if (IS_ERR(th))
+               RETURN(PTR_ERR(th));
+
+       rc = dt_declare_attr_set(env, dob, la, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = dt_declare_punch(env, dob, start, OBD_OBJECT_EOF, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       tgt_vbr_obj_set(env, dob);
+       rc = dt_trans_start(env, dt, th);
+       if (rc)
+               GOTO(stop, rc);
+
+       dt_write_lock(env, dob, 0);
+       rc = dt_punch(env, dob, start, OBD_OBJECT_EOF, th);
+       if (rc)
+               GOTO(unlock, rc);
+       rc = dt_attr_set(env, dob, la, th);
+       if (rc)
+               GOTO(unlock, rc);
+unlock:
+       dt_write_unlock(env, dob);
+stop:
+       th->th_result = rc;
+       dt_trans_stop(env, dt, th);
+       RETURN(rc);
+}
+
+int mdt_punch_hdl(struct tgt_session_info *tsi)
+{
+       const struct obdo *oa = &tsi->tsi_ost_body->oa;
+       struct ost_body *repbody;
+       struct mdt_thread_info *info;
+       struct lu_attr *la;
+       struct ldlm_namespace *ns = tsi->tsi_tgt->lut_obd->obd_namespace;
+       struct obd_export *exp = tsi->tsi_exp;
+       struct mdt_device *mdt = mdt_dev(exp->exp_obd->obd_lu_dev);
+       struct mdt_object *mo;
+       struct dt_object *dob;
+       __u64 flags = 0;
+       struct lustre_handle lh = { 0, };
+       __u64 start, end;
+       int rc;
+       bool srvlock;
+
+       ENTRY;
+
+       /* check that we do support OBD_CONNECT_TRUNCLOCK. */
+       CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK);
+
+       if ((oa->o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
+           (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
+               RETURN(err_serious(-EPROTO));
+
+       repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
+       if (repbody == NULL)
+               RETURN(err_serious(-ENOMEM));
+
+       /* punch start,end are passed in o_size,o_blocks throught wire */
+       start = oa->o_size;
+       end = oa->o_blocks;
+
+       if (end != OBD_OBJECT_EOF) /* Only truncate is supported */
+               RETURN(-EPROTO);
+
+       info = tsi2mdt_info(tsi);
+       la = &info->mti_attr.ma_attr;
+       /* standard truncate optimization: if file body is completely
+        * destroyed, don't send data back to the server. */
+       if (start == 0)
+               flags |= LDLM_FL_AST_DISCARD_DATA;
+
+       repbody->oa.o_oi = oa->o_oi;
+       repbody->oa.o_valid = OBD_MD_FLID;
+
+       srvlock = (exp_connect_flags(exp) & OBD_CONNECT_SRVLOCK) &&
+                 oa->o_valid & OBD_MD_FLFLAGS &&
+                 oa->o_flags & OBD_FL_SRVLOCK;
+
+       if (srvlock) {
+               rc = tgt_mdt_data_lock(ns, &tsi->tsi_resid, &lh, LCK_PW,
+                                      &flags);
+               if (rc != 0)
+                       GOTO(out, rc);
+       }
+
+       CDEBUG(D_INODE, "calling punch for object "DFID", valid = %#llx"
+              ", start = %lld, end = %lld\n", PFID(&tsi->tsi_fid),
+              oa->o_valid, start, end);
+
+       mo = mdt_object_find(tsi->tsi_env, mdt, &tsi->tsi_fid);
+       if (IS_ERR(mo))
+               GOTO(out_unlock, rc = PTR_ERR(mo));
+
+       mdt_dom_write_lock(mo);
+       if (!mdt_object_exists(mo))
+               GOTO(out_put, rc = -ENOENT);
+       dob = mdt_obj2dt(mo);
+
+       la_from_obdo(la, oa, OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME);
+       la->la_size = start;
+       la->la_valid |= LA_SIZE;
+
+       rc = mdt_object_punch(tsi->tsi_env, mdt->mdt_bottom, dob,
+                             start, end, la);
+       mdt_dom_write_unlock(mo);
+       if (rc)
+               GOTO(out_put, rc);
+
+       mdt_io_counter_incr(tsi->tsi_exp, LPROC_MDT_IO_PUNCH,
+                           tsi->tsi_jobid, 1);
+       EXIT;
+out_put:
+       lu_object_put(tsi->tsi_env, &mo->mot_obj);
+out_unlock:
+       if (srvlock)
+               mdt_save_lock(info, &lh, LCK_PW, rc);
+out:
+       mdt_thread_info_fini(info);
+       if (rc == 0) {
+               struct ldlm_resource *res;
+
+               /* we do not call this before to avoid lu_object_find() in
+                *  ->lvbo_update() holding another reference on the object.
+                * otherwise concurrent destroy can make the object unavailable
+                * for 2nd lu_object_find() waiting for the first reference
+                * to go... deadlock! */
+               res = ldlm_resource_get(ns, NULL, &tsi->tsi_resid,
+                                       LDLM_IBITS, 0);
+               if (!IS_ERR(res)) {
+                       ldlm_res_lvbo_update(res, NULL, 0);
+                       ldlm_resource_putref(res);
+               }
+       }
+       return rc;
+}
+
index d0b37fb..c60742e 100644 (file)
@@ -894,6 +894,8 @@ void mdt_counter_incr(struct ptlrpc_request *req, int opcode)
 
 void mdt_stats_counter_init(struct lprocfs_stats *stats)
 {
 
 void mdt_stats_counter_init(struct lprocfs_stats *stats)
 {
+       LASSERT(stats && stats->ls_num >= LPROC_MDT_LAST);
+
         lprocfs_counter_init(stats, LPROC_MDT_OPEN, 0, "open", "reqs");
         lprocfs_counter_init(stats, LPROC_MDT_CLOSE, 0, "close", "reqs");
         lprocfs_counter_init(stats, LPROC_MDT_MKNOD, 0, "mknod", "reqs");
         lprocfs_counter_init(stats, LPROC_MDT_OPEN, 0, "open", "reqs");
         lprocfs_counter_init(stats, LPROC_MDT_CLOSE, 0, "close", "reqs");
         lprocfs_counter_init(stats, LPROC_MDT_MKNOD, 0, "mknod", "reqs");
@@ -912,6 +914,11 @@ void mdt_stats_counter_init(struct lprocfs_stats *stats)
                              "samedir_rename", "reqs");
         lprocfs_counter_init(stats, LPROC_MDT_CROSSDIR_RENAME, 0,
                              "crossdir_rename", "reqs");
                              "samedir_rename", "reqs");
         lprocfs_counter_init(stats, LPROC_MDT_CROSSDIR_RENAME, 0,
                              "crossdir_rename", "reqs");
+       lprocfs_counter_init(stats, LPROC_MDT_IO_READ,
+                            LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes");
+       lprocfs_counter_init(stats, LPROC_MDT_IO_WRITE,
+                            LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes");
+       lprocfs_counter_init(stats, LPROC_MDT_IO_PUNCH, 0, "punch", "reqs");
 }
 
 int mdt_procfs_init(struct mdt_device *mdt, const char *name)
 }
 
 int mdt_procfs_init(struct mdt_device *mdt, const char *name)
index a06e102..078051a 100644 (file)
@@ -64,6 +64,7 @@ struct mds_device {
        struct ptlrpc_service   *mds_mdsc_service;
        struct ptlrpc_service   *mds_mdss_service;
        struct ptlrpc_service   *mds_fld_service;
        struct ptlrpc_service   *mds_mdsc_service;
        struct ptlrpc_service   *mds_mdss_service;
        struct ptlrpc_service   *mds_fld_service;
+       struct ptlrpc_service   *mds_io_service;
        struct mutex             mds_health_mutex;
        struct kset             *mds_kset;
 };
        struct mutex             mds_health_mutex;
        struct kset             *mds_kset;
 };
@@ -75,6 +76,10 @@ static unsigned long mds_num_threads;
 module_param(mds_num_threads, ulong, 0444);
 MODULE_PARM_DESC(mds_num_threads, "number of MDS service threads to start");
 
 module_param(mds_num_threads, ulong, 0444);
 MODULE_PARM_DESC(mds_num_threads, "number of MDS service threads to start");
 
+int mds_max_io_threads = 512;
+module_param(mds_max_io_threads, int, 0444);
+MODULE_PARM_DESC(mds_max_io_threads, "maximum number of MDS IO service threads");
+
 static char *mds_num_cpts;
 module_param(mds_num_cpts, charp, 0444);
 MODULE_PARM_DESC(mds_num_cpts, "CPU partitions MDS threads should run on");
 static char *mds_num_cpts;
 module_param(mds_num_cpts, charp, 0444);
 MODULE_PARM_DESC(mds_num_cpts, "CPU partitions MDS threads should run on");
@@ -134,6 +139,10 @@ static void mds_stop_ptlrpc_service(struct mds_device *m)
                ptlrpc_unregister_service(m->mds_fld_service);
                m->mds_fld_service = NULL;
        }
                ptlrpc_unregister_service(m->mds_fld_service);
                m->mds_fld_service = NULL;
        }
+       if (m->mds_io_service != NULL) {
+               ptlrpc_unregister_service(m->mds_io_service);
+               m->mds_io_service = NULL;
+       }
        mutex_unlock(&m->mds_health_mutex);
 
        EXIT;
        mutex_unlock(&m->mds_health_mutex);
 
        EXIT;
@@ -440,6 +449,43 @@ static int mds_start_ptlrpc_service(struct mds_device *m)
                GOTO(err_mds_svc, rc);
        }
 
                GOTO(err_mds_svc, rc);
        }
 
+       memset(&conf, 0, sizeof(conf));
+       conf = (typeof(conf)) {
+               .psc_name               = LUSTRE_MDT_NAME "_io",
+               .psc_watchdog_factor    = MDT_SERVICE_WATCHDOG_FACTOR,
+               .psc_buf                = {
+                       .bc_nbufs               = OST_NBUFS,
+                       .bc_buf_size            = OST_IO_BUFSIZE,
+                       .bc_req_max_size        = OST_IO_MAXREQSIZE,
+                       .bc_rep_max_size        = OST_IO_MAXREPSIZE,
+                       .bc_req_portal          = MDS_IO_PORTAL,
+                       .bc_rep_portal          = MDC_REPLY_PORTAL,
+               },
+               .psc_thr                = {
+                       .tc_thr_name            = "ll_mdt_io",
+                       .tc_thr_factor          = OSS_THR_FACTOR,
+                       .tc_nthrs_init          = OSS_NTHRS_INIT,
+                       .tc_nthrs_base          = OSS_NTHRS_BASE,
+                       .tc_nthrs_max           = mds_max_io_threads,
+                       .tc_cpu_affinity        = 1,
+                       .tc_ctx_tags            = LCT_DT_THREAD | LCT_MD_THREAD,
+               },
+               .psc_ops                = {
+                       .so_thr_init            = tgt_io_thread_init,
+                       .so_thr_done            = tgt_io_thread_done,
+                       .so_req_handler         = tgt_request_handle,
+                       .so_req_printer         = target_print_req,
+               },
+       };
+       m->mds_io_service = ptlrpc_register_service(&conf, m->mds_kset,
+                                                   procfs_entry);
+       if (IS_ERR(m->mds_io_service)) {
+               rc = PTR_ERR(m->mds_io_service);
+               CERROR("failed to start MDT I/O service: %d\n", rc);
+               m->mds_io_service = NULL;
+               GOTO(err_mds_svc, rc);
+       }
+
        EXIT;
 err_mds_svc:
        if (rc)
        EXIT;
 err_mds_svc:
        if (rc)
@@ -554,6 +600,7 @@ static int mds_health_check(const struct lu_env *env, struct obd_device *obd)
        rc |= ptlrpc_service_health_check(mds->mds_mdsc_service);
        rc |= ptlrpc_service_health_check(mds->mds_mdss_service);
        rc |= ptlrpc_service_health_check(mds->mds_fld_service);
        rc |= ptlrpc_service_health_check(mds->mds_mdsc_service);
        rc |= ptlrpc_service_health_check(mds->mds_mdss_service);
        rc |= ptlrpc_service_health_check(mds->mds_fld_service);
+       rc |= ptlrpc_service_health_check(mds->mds_io_service);
        mutex_unlock(&mds->mds_health_mutex);
 
        return rc != 0 ? 1 : 0;
        mutex_unlock(&mds->mds_health_mutex);
 
        return rc != 0 ? 1 : 0;
index 5f259e9..d34c30e 100644 (file)
@@ -902,7 +902,7 @@ static int osc_del_shrink_grant(struct client_obd *client)
                                          TIMEOUT_GRANT);
 }
 
                                          TIMEOUT_GRANT);
 }
 
-static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
+void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
 {
        /*
         * ocd_grant is the total grant amount we're expect to hold: if we've
 {
        /*
         * ocd_grant is the total grant amount we're expect to hold: if we've
@@ -959,6 +959,7 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
            list_empty(&cli->cl_grant_shrink_list))
                osc_add_shrink_grant(cli);
 }
            list_empty(&cli->cl_grant_shrink_list))
                osc_add_shrink_grant(cli);
 }
+EXPORT_SYMBOL(osc_init_grant);
 
 /* We assume that the reason this OSC got a short read is because it read
  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
 
 /* We assume that the reason this OSC got a short read is because it read
  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
index cceeecf..e173964 100644 (file)
@@ -1415,8 +1415,7 @@ static dnode_t *osd_mkreg(const struct lu_env *env, struct osd_object *obj,
        if (rc)
                return ERR_PTR(rc);
 
        if (rc)
                return ERR_PTR(rc);
 
-       if ((fid_is_idif(fid) || fid_is_norm(fid) || fid_is_echo(fid)) &&
-           osd->od_is_ost) {
+       if ((fid_is_idif(fid) || fid_is_norm(fid) || fid_is_echo(fid))) {
                /* The minimum block size must be at least page size otherwise
                 * it will break the assumption in tgt_thread_big_cache where
                 * the array size is PTLRPC_MAX_BRW_PAGES. It will also affect
                /* The minimum block size must be at least page size otherwise
                 * it will break the assumption in tgt_thread_big_cache where
                 * the array size is PTLRPC_MAX_BRW_PAGES. It will also affect
index 8460774..9026f21 100644 (file)
@@ -1570,6 +1570,35 @@ void tgt_io_thread_done(struct ptlrpc_thread *thread)
        EXIT;
 }
 EXPORT_SYMBOL(tgt_io_thread_done);
        EXIT;
 }
 EXPORT_SYMBOL(tgt_io_thread_done);
+
+/**
+ * Helper function for getting Data-on-MDT file server DLM lock
+ * if asked by client.
+ */
+int tgt_mdt_data_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
+                     struct lustre_handle *lh, int mode, __u64 *flags)
+{
+       union ldlm_policy_data policy;
+       int rc;
+
+       ENTRY;
+
+       LASSERT(lh != NULL);
+       LASSERT(ns != NULL);
+       LASSERT(!lustre_handle_is_used(lh));
+
+       policy.l_inodebits.bits = MDS_INODELOCK_DOM | MDS_INODELOCK_UPDATE;
+       policy.l_inodebits.try_bits = 0;
+
+       rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, &policy, mode,
+                                   flags, ldlm_blocking_ast,
+                                   ldlm_completion_ast, ldlm_glimpse_ast,
+                                   NULL, 0, LVB_T_NONE, NULL, lh);
+
+       RETURN(rc == ELDLM_OK ? 0 : -EIO);
+}
+EXPORT_SYMBOL(tgt_mdt_data_lock);
+
 /**
  * Helper function for getting server side [start, start+count] DLM lock
  * if asked by client.
 /**
  * Helper function for getting server side [start, start+count] DLM lock
  * if asked by client.
@@ -1614,13 +1643,15 @@ void tgt_extent_unlock(struct lustre_handle *lh, enum ldlm_mode mode)
 }
 EXPORT_SYMBOL(tgt_extent_unlock);
 
 }
 EXPORT_SYMBOL(tgt_extent_unlock);
 
-int tgt_brw_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
-                struct obd_ioobj *obj, struct niobuf_remote *nb,
-                struct lustre_handle *lh, enum ldlm_mode mode)
+static int tgt_brw_lock(struct obd_export *exp, struct ldlm_res_id *res_id,
+                       struct obd_ioobj *obj, struct niobuf_remote *nb,
+                       struct lustre_handle *lh, enum ldlm_mode mode)
 {
 {
+       struct ldlm_namespace   *ns = exp->exp_obd->obd_namespace;
        __u64                    flags = 0;
        int                      nrbufs = obj->ioo_bufcnt;
        int                      i;
        __u64                    flags = 0;
        int                      nrbufs = obj->ioo_bufcnt;
        int                      i;
+       int                      rc;
 
        ENTRY;
 
 
        ENTRY;
 
@@ -1637,14 +1668,19 @@ int tgt_brw_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
                if (!(nb[i].rnb_flags & OBD_BRW_SRVLOCK))
                        RETURN(-EFAULT);
 
                if (!(nb[i].rnb_flags & OBD_BRW_SRVLOCK))
                        RETURN(-EFAULT);
 
-       RETURN(tgt_extent_lock(ns, res_id, nb[0].rnb_offset,
-                              nb[nrbufs - 1].rnb_offset +
-                              nb[nrbufs - 1].rnb_len - 1,
-                              lh, mode, &flags));
+       /* MDT IO for data-on-mdt */
+       if (exp->exp_connect_data.ocd_connect_flags & OBD_CONNECT_IBITS)
+               rc = tgt_mdt_data_lock(ns, res_id, lh, mode, &flags);
+       else
+               rc = tgt_extent_lock(ns, res_id, nb[0].rnb_offset,
+                                    nb[nrbufs - 1].rnb_offset +
+                                    nb[nrbufs - 1].rnb_len - 1,
+                                    lh, mode, &flags);
+       RETURN(rc);
 }
 
 }
 
-void tgt_brw_unlock(struct obd_ioobj *obj, struct niobuf_remote *niob,
-                   struct lustre_handle *lh, enum ldlm_mode mode)
+static void tgt_brw_unlock(struct obd_ioobj *obj, struct niobuf_remote *niob,
+                          struct lustre_handle *lh, enum ldlm_mode mode)
 {
        ENTRY;
 
 {
        ENTRY;
 
@@ -1882,7 +1918,8 @@ int tgt_brw_read(struct tgt_session_info *tsi)
 
        ENTRY;
 
 
        ENTRY;
 
-       if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL) {
+       if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL &&
+           ptlrpc_req2svc(req)->srv_req_portal != MDS_IO_PORTAL) {
                CERROR("%s: deny read request from %s to portal %u\n",
                       tgt_name(tsi->tsi_tgt),
                       obd_export_nid2str(req->rq_export),
                CERROR("%s: deny read request from %s to portal %u\n",
                       tgt_name(tsi->tsi_tgt),
                       obd_export_nid2str(req->rq_export),
@@ -1925,8 +1962,8 @@ int tgt_brw_read(struct tgt_session_info *tsi)
 
        local_nb = tbc->local;
 
 
        local_nb = tbc->local;
 
-       rc = tgt_brw_lock(exp->exp_obd->obd_namespace, &tsi->tsi_resid, ioo,
-                         remote_nb, &lockh, LCK_PR);
+       rc = tgt_brw_lock(exp, &tsi->tsi_resid, ioo, remote_nb, &lockh,
+                         LCK_PR);
        if (rc != 0)
                RETURN(rc);
 
        if (rc != 0)
                RETURN(rc);
 
@@ -2136,7 +2173,8 @@ int tgt_brw_write(struct tgt_session_info *tsi)
 
        ENTRY;
 
 
        ENTRY;
 
-       if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL) {
+       if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL &&
+           ptlrpc_req2svc(req)->srv_req_portal != MDS_IO_PORTAL) {
                CERROR("%s: deny write request from %s to portal %u\n",
                       tgt_name(tsi->tsi_tgt),
                       obd_export_nid2str(req->rq_export),
                CERROR("%s: deny write request from %s to portal %u\n",
                       tgt_name(tsi->tsi_tgt),
                       obd_export_nid2str(req->rq_export),
@@ -2200,8 +2238,8 @@ int tgt_brw_write(struct tgt_session_info *tsi)
 
        local_nb = tbc->local;
 
 
        local_nb = tbc->local;
 
-       rc = tgt_brw_lock(exp->exp_obd->obd_namespace, &tsi->tsi_resid, ioo,
-                         remote_nb, &lockh, LCK_PW);
+       rc = tgt_brw_lock(exp, &tsi->tsi_resid, ioo, remote_nb, &lockh,
+                         LCK_PW);
        if (rc != 0)
                GOTO(out, rc);
 
        if (rc != 0)
                GOTO(out, rc);