Add methods to handle IO requests in MDT similar to OFD.
Introduce MDS_INODEBITS_DOM bit for data on MDT, destinguish
IO requests to MDT and OST and take appropriate lock in
target code.
Change-Id: I7feaa00c381f821510ca1343b042ed5f09050ac6
Signed-off-by: Mikhal Pershin <mike.pershin@intel.com>
Reviewed-on: https://review.whamcloud.com/28013
Reviewed-by: Jinshan Xiong <jinshan.xiong@intel.com>
Reviewed-by: Bobi Jam <bobijam@hotmail.com>
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
int tgt_io_thread_init(struct ptlrpc_thread *thread);
void tgt_io_thread_done(struct ptlrpc_thread *thread);
+int tgt_mdt_data_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
+ struct lustre_handle *lh, int mode, __u64 *flags);
int tgt_extent_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
__u64 start, __u64 end, struct lustre_handle *lh,
int mode, __u64 *flags);
void tgt_extent_unlock(struct lustre_handle *lh, enum ldlm_mode mode);
-int tgt_brw_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
- struct obd_ioobj *obj, struct niobuf_remote *nb,
- struct lustre_handle *lh, enum ldlm_mode mode);
-void tgt_brw_unlock(struct obd_ioobj *obj, struct niobuf_remote *niob,
- struct lustre_handle *lh, enum ldlm_mode mode);
int tgt_brw_read(struct tgt_session_info *tsi);
int tgt_brw_write(struct tgt_session_info *tsi);
int tgt_hpreq_handler(struct ptlrpc_request *req);
int osc_object_glimpse(const struct lu_env *env, const struct cl_object *obj,
struct ost_lvb *lvb);
+/* osc_request.c */
+void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd);
+
/*****************************************************************************
*
* Accessors and type conversions.
#define MDC_REPLY_PORTAL 10
//#define MDC_BULK_PORTAL 11
#define MDS_REQUEST_PORTAL 12
-//#define MDS_REPLY_PORTAL 13
+#define MDS_IO_PORTAL 13
#define MDS_BULK_PORTAL 14
#define LDLM_CB_REQUEST_PORTAL 15
#define LDLM_CB_REPLY_PORTAL 16
OBD_CONNECT_FLOCK_DEAD | \
OBD_CONNECT_DISP_STRIPE | OBD_CONNECT_LFSCK | \
OBD_CONNECT_OPEN_BY_FID | \
- OBD_CONNECT_DIR_STRIPE | \
- OBD_CONNECT_BULK_MBITS | \
+ OBD_CONNECT_DIR_STRIPE | OBD_CONNECT_GRANT | \
+ OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_SRVLOCK | \
+ OBD_CONNECT_BULK_MBITS | OBD_CONNECT_CKSUM | \
OBD_CONNECT_MULTIMODRPCS | \
OBD_CONNECT_SUBTREE | OBD_CONNECT_LARGE_ACL | \
OBD_CONNECT_FLAGS2)
* will grant LOOKUP_LOCK. */
#define MDS_INODELOCK_PERM 0x000010
#define MDS_INODELOCK_XATTR 0x000020 /* extended attributes */
+#define MDS_INODELOCK_DOM 0x000040 /* Data for data-on-mdt files */
-#define MDS_INODELOCK_MAXSHIFT 5
+#define MDS_INODELOCK_MAXSHIFT 6
/* This FULL lock is useful to take on unlink sort of operations */
#define MDS_INODELOCK_FULL ((1<<(MDS_INODELOCK_MAXSHIFT+1))-1)
}
/* indicate MDT features supported by this client */
- data->ocd_connect_flags = OBD_CONNECT_IBITS | OBD_CONNECT_NODEVOH |
- OBD_CONNECT_ATTRFID |
- OBD_CONNECT_VERSION | OBD_CONNECT_BRW_SIZE |
- OBD_CONNECT_MDS_CAPA | OBD_CONNECT_OSS_CAPA |
- OBD_CONNECT_CANCELSET | OBD_CONNECT_FID |
- OBD_CONNECT_AT | OBD_CONNECT_LOV_V3 |
+ data->ocd_connect_flags = OBD_CONNECT_IBITS | OBD_CONNECT_NODEVOH |
+ OBD_CONNECT_ATTRFID | OBD_CONNECT_GRANT |
+ OBD_CONNECT_VERSION | OBD_CONNECT_BRW_SIZE |
+ OBD_CONNECT_MDS_CAPA | OBD_CONNECT_OSS_CAPA |
+ OBD_CONNECT_CANCELSET | OBD_CONNECT_FID |
+ OBD_CONNECT_AT | OBD_CONNECT_LOV_V3 |
OBD_CONNECT_VBR | OBD_CONNECT_FULL20 |
OBD_CONNECT_64BITHASH |
OBD_CONNECT_EINPROGRESS |
OBD_CONNECT_DISP_STRIPE | OBD_CONNECT_LFSCK |
OBD_CONNECT_OPEN_BY_FID |
OBD_CONNECT_DIR_STRIPE |
- OBD_CONNECT_BULK_MBITS |
+ OBD_CONNECT_BULK_MBITS | OBD_CONNECT_CKSUM |
OBD_CONNECT_SUBTREE |
OBD_CONNECT_FLAGS2 | OBD_CONNECT_MULTIMODRPCS;
OBD_CONNECT_LARGE_ACL;
#endif
+ data->ocd_cksum_types = cksum_types_supported_client();
+
if (OBD_FAIL_CHECK(OBD_FAIL_MDC_LIGHTWEIGHT))
/* flag mdc connection as lightweight, only used for test
* purpose, use with care */
#include <uapi/linux/lustre/lustre_param.h>
#include <lustre_swab.h>
#include <obd_class.h>
+#include <lustre_osc.h>
#include "mdc_internal.h"
LASSERT(imp->imp_obd == obd);
switch (event) {
+ case IMP_EVENT_DISCON: {
+ struct client_obd *cli = &obd->u.cli;
+ spin_lock(&cli->cl_loi_list_lock);
+ cli->cl_avail_grant = 0;
+ cli->cl_lost_grant = 0;
+ spin_unlock(&cli->cl_loi_list_lock);
+ break;
+ }
case IMP_EVENT_INACTIVE: {
struct client_obd *cli = &obd->u.cli;
/*
if (rc == 0)
rc = mdc_kuc_reregister(imp);
break;
- case IMP_EVENT_OCD:
+ case IMP_EVENT_OCD: {
+ struct obd_connect_data *ocd = &imp->imp_connect_data;
+
+ if (OCD_HAS_FLAG(ocd, GRANT))
+ osc_init_grant(&obd->u.cli, ocd);
+
rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD);
break;
- case IMP_EVENT_DISCON:
+ }
case IMP_EVENT_DEACTIVATE:
case IMP_EVENT_ACTIVATE:
break;
MODULES := mdt
mdt-objs := mdt_handler.o mdt_lib.o mdt_reint.o mdt_xattr.o mdt_recovery.o
mdt-objs += mdt_open.o mdt_identity.o mdt_lproc.o mdt_fs.o
-mdt-objs += mdt_lvb.o mdt_hsm.o mdt_mds.o
+mdt-objs += mdt_lvb.o mdt_hsm.o mdt_mds.o mdt_io.o
mdt-objs += mdt_hsm_cdt_actions.o
mdt-objs += mdt_hsm_cdt_requests.o
mdt-objs += mdt_hsm_cdt_client.o
#include <obd.h>
#include <obd_support.h>
#include <lustre_barrier.h>
-
+#include <obd_cksum.h>
#include <llog_swab.h>
#include "mdt_internal.h"
/* if no object is allocated on osts, the size on mds is valid.
* b=22272 */
b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
- } else if ((ma->ma_valid & MA_LOV) && ma->ma_lmm != NULL &&
- mdt_hsm_is_released(ma->ma_lmm)) {
- /* A released file stores its size on MDS. */
- /* But return 1 block for released file, unless tools like tar
- * will consider it fully sparse. (LU-3864)
- */
- if (unlikely(b->mbo_size == 0))
- b->mbo_blocks = 0;
- else
- b->mbo_blocks = 1;
- b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+ } else if ((ma->ma_valid & MA_LOV) && ma->ma_lmm != NULL) {
+ if (mdt_hsm_is_released(ma->ma_lmm)) {
+ /* A released file stores its size on MDS. */
+ /* But return 1 block for released file, unless tools
+ * like tar will consider it fully sparse. (LU-3864)
+ */
+ if (unlikely(b->mbo_size == 0))
+ b->mbo_blocks = 0;
+ else
+ b->mbo_blocks = 1;
+ b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+ } else if (lov_pattern(ma->ma_lmm->lmm_pattern) ==
+ LOV_PATTERN_MDT) {
+ b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
+ }
}
if (fid != NULL && (b->mbo_valid & OBD_MD_FLSIZE))
}
/* this should sync this object */
-static int mdt_object_sync(struct mdt_thread_info *info)
+static int mdt_object_sync(const struct lu_env *env, struct obd_export *exp,
+ struct mdt_object *mo)
{
- struct md_object *next;
int rc;
+
ENTRY;
- if (!mdt_object_exists(info->mti_object)) {
+ if (!mdt_object_exists(mo)) {
CWARN("%s: non existing object "DFID": rc = %d\n",
- mdt_obd_name(info->mti_mdt),
- PFID(mdt_object_fid(info->mti_object)), -ESTALE);
+ exp->exp_obd->obd_name, PFID(mdt_object_fid(mo)),
+ -ESTALE);
RETURN(-ESTALE);
}
- next = mdt_object_child(info->mti_object);
- rc = mo_object_sync(info->mti_env, next);
+
+ rc = mo_object_sync(env, mdt_object_child(mo));
RETURN(rc);
}
struct mdt_thread_info *info = tsi2mdt_info(tsi);
/* sync an object */
- rc = mdt_object_sync(info);
+ rc = mdt_object_sync(tsi->tsi_env, tsi->tsi_exp,
+ info->mti_object);
if (rc == 0) {
const struct lu_fid *fid;
struct lu_attr *la = &info->mti_attr.ma_attr;
RETURN(rc);
}
+static int mdt_data_sync(struct tgt_session_info *tsi)
+{
+ struct mdt_thread_info *info;
+ struct mdt_device *mdt = mdt_exp2dev(tsi->tsi_exp);
+ struct ost_body *body = tsi->tsi_ost_body;
+ struct ost_body *repbody;
+ struct mdt_object *mo = NULL;
+ struct md_attr *ma;
+ int rc = 0;
+
+ ENTRY;
+
+ repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
+
+ /* if no fid is specified then do nothing,
+ * device sync is done via MDS_SYNC */
+ if (fid_is_zero(&tsi->tsi_fid))
+ RETURN(0);
+
+ mo = mdt_object_find(tsi->tsi_env, mdt, &tsi->tsi_fid);
+ if (IS_ERR(mo))
+ RETURN(PTR_ERR(mo));
+
+ rc = mdt_object_sync(tsi->tsi_env, tsi->tsi_exp, mo);
+ if (rc)
+ GOTO(put, rc);
+
+ repbody->oa.o_oi = body->oa.o_oi;
+ repbody->oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+
+ info = tsi2mdt_info(tsi);
+ ma = &info->mti_attr;
+ ma->ma_need = MA_INODE;
+ ma->ma_valid = 0;
+ rc = mdt_attr_get_complex(info, mo, ma);
+ if (rc == 0)
+ obdo_from_la(&repbody->oa, &ma->ma_attr, VALID_FLAGS);
+ else
+ rc = 0;
+ mdt_thread_info_fini(info);
+
+ EXIT;
+put:
+ if (mo != NULL)
+ mdt_object_put(tsi->tsi_env, mo);
+ return rc;
+}
+
/*
* Handle quota control requests to consult current usage/limit, but also
* to configure quota enforcement
* \param mode lock mode
* \param decref force immediate lock releasing
*/
-static void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
- enum ldlm_mode mode, int decref)
+void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
+ enum ldlm_mode mode, int decref)
{
ENTRY;
return rc;
}
+#define OBD_FAIL_OST_READ_NET OBD_FAIL_OST_BRW_NET
+#define OBD_FAIL_OST_WRITE_NET OBD_FAIL_OST_BRW_NET
+#define OST_BRW_READ OST_READ
+#define OST_BRW_WRITE OST_WRITE
+
static struct tgt_handler mdt_tgt_handlers[] = {
TGT_RPC_HANDLER(MDS_FIRST_OPC,
0, MDS_CONNECT, mdt_tgt_connect,
mdt_swap_layouts),
};
+static struct tgt_handler mdt_io_ops[] = {
+TGT_OST_HDL(HABEO_CORPUS | HABEO_REFERO, OST_BRW_READ, tgt_brw_read),
+TGT_OST_HDL(HABEO_CORPUS | MUTABOR, OST_BRW_WRITE, tgt_brw_write),
+TGT_OST_HDL(HABEO_CORPUS | HABEO_REFERO | MUTABOR,
+ OST_PUNCH, mdt_punch_hdl),
+TGT_OST_HDL(HABEO_CORPUS | HABEO_REFERO, OST_SYNC, mdt_data_sync),
+};
+
static struct tgt_handler mdt_sec_ctx_ops[] = {
TGT_SEC_HDL_VAR(0, SEC_CTX_INIT, mdt_sec_ctx_handle),
TGT_SEC_HDL_VAR(0, SEC_CTX_INIT_CONT,mdt_sec_ctx_handle),
.tos_opc_end = LFSCK_LAST_OPC,
.tos_hs = tgt_lfsck_handlers
},
-
+ {
+ .tos_opc_start = OST_FIRST_OPC,
+ .tos_opc_end = OST_LAST_OPC,
+ .tos_hs = mdt_io_ops
+ },
{
.tos_hs = NULL
}
o->lo_ops = &mdt_obj_ops;
spin_lock_init(&mo->mot_write_lock);
mutex_init(&mo->mot_lov_mutex);
+ init_rwsem(&mo->mot_dom_sem);
init_rwsem(&mo->mot_open_sem);
atomic_set(&mo->mot_open_count, 0);
RETURN(o);
* \retval -EPROTO \a data unexpectedly has zero obd_connect_data::ocd_brw_size
* \retval -EBADE client and server feature requirements are incompatible
*/
-static int mdt_connect_internal(struct obd_export *exp,
+static int mdt_connect_internal(const struct lu_env *env,
+ struct obd_export *exp,
struct mdt_device *mdt,
- struct obd_connect_data *data)
+ struct obd_connect_data *data, bool reconnect)
{
LASSERT(data != NULL);
}
}
+ if (OCD_HAS_FLAG(data, GRANT))
+ data->ocd_grant = mdt_grant_connect(env, exp, data->ocd_grant,
+ !reconnect);
+
/* NB: Disregard the rule against updating
* exp_connect_data.ocd_connect_flags in this case, since
* tgt_client_new() needs to know if this is a lightweight
spin_unlock(&exp->exp_lock);
}
+ if (OCD_HAS_FLAG(data, CKSUM)) {
+ __u32 cksum_types = data->ocd_cksum_types;
+
+ /* The client set in ocd_cksum_types the checksum types it
+ * supports. We have to mask off the algorithms that we don't
+ * support */
+ data->ocd_cksum_types &= cksum_types_supported_server();
+
+ if (unlikely(data->ocd_cksum_types == 0)) {
+ CERROR("%s: Connect with checksum support but no "
+ "ocd_cksum_types is set\n",
+ exp->exp_obd->obd_name);
+ RETURN(-EPROTO);
+ }
+
+ CDEBUG(D_RPCTRACE, "%s: cli %s supports cksum type %x, return "
+ "%x\n", exp->exp_obd->obd_name, obd_export_nid2str(exp),
+ cksum_types, data->ocd_cksum_types);
+ } else {
+ /* This client does not support OBD_CONNECT_CKSUM
+ * fall back to CRC32 */
+ CDEBUG(D_RPCTRACE, "%s: cli %s does not support "
+ "OBD_CONNECT_CKSUM, CRC32 will be used\n",
+ exp->exp_obd->obd_name, obd_export_nid2str(exp));
+ }
+
return 0;
}
if (rc != 0 && rc != -EEXIST)
GOTO(out, rc);
- rc = mdt_connect_internal(lexp, mdt, data);
+ rc = mdt_connect_internal(env, lexp, mdt, data, false);
if (rc == 0) {
struct lsd_client_data *lcd = lexp->exp_target_data.ted_lcd;
if (rc != 0 && rc != -EEXIST)
RETURN(rc);
- rc = mdt_connect_internal(exp, mdt_dev(obd->obd_lu_dev), data);
+ rc = mdt_connect_internal(env, exp, mdt_dev(obd->obd_lu_dev), data,
+ true);
if (rc == 0)
mdt_export_stats_init(obd, exp, localdata);
else
.o_destroy_export = mdt_destroy_export,
.o_iocontrol = mdt_iocontrol,
.o_postrecov = mdt_obd_postrecov,
+ /* Data-on-MDT IO methods */
+ .o_preprw = mdt_obd_preprw,
+ .o_commitrw = mdt_obd_commitrw,
};
static struct lu_device* mdt_device_fini(const struct lu_env *env,
spinlock_t mot_write_lock;
/* Lock to protect create_data */
struct mutex mot_lov_mutex;
+ /* lock to protect read/write stages for Data-on-MDT files */
+ struct rw_semaphore mot_dom_sem;
/* Lock to protect lease open.
* Lease open acquires write lock; normal open acquires read lock */
struct rw_semaphore mot_open_sem;
void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *mo,
struct mdt_lock_handle *lh, int decref);
+void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
+ enum ldlm_mode mode, int decref);
struct mdt_object *mdt_object_new(const struct lu_env *env,
struct mdt_device *,
LPROC_MDT_SETXATTR,
LPROC_MDT_STATFS,
LPROC_MDT_SYNC,
- LPROC_MDT_SAMEDIR_RENAME,
- LPROC_MDT_CROSSDIR_RENAME,
- LPROC_MDT_LAST,
+ LPROC_MDT_SAMEDIR_RENAME,
+ LPROC_MDT_CROSSDIR_RENAME,
+ LPROC_MDT_IO_READ,
+ LPROC_MDT_IO_WRITE,
+ LPROC_MDT_IO_PUNCH,
+ LPROC_MDT_LAST,
};
void mdt_counter_incr(struct ptlrpc_request *req, int opcode);
void mdt_stats_counter_init(struct lprocfs_stats *stats);
return jobid;
}
+/* MDT IO */
+
+#define VALID_FLAGS (LA_TYPE | LA_MODE | LA_SIZE | LA_BLOCKS | \
+ LA_BLKSIZE | LA_ATIME | LA_MTIME | LA_CTIME)
+
+int mdt_obd_preprw(const struct lu_env *env, int cmd, struct obd_export *exp,
+ struct obdo *oa, int objcount, struct obd_ioobj *obj,
+ struct niobuf_remote *rnb, int *nr_local,
+ struct niobuf_local *lnb);
+
+int mdt_obd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
+ struct obdo *oa, int objcount, struct obd_ioobj *obj,
+ struct niobuf_remote *rnb, int npages,
+ struct niobuf_local *lnb, int old_rc);
+int mdt_punch_hdl(struct tgt_session_info *tsi);
+
+/* grants */
+long mdt_grant_connect(const struct lu_env *env, struct obd_export *exp,
+ u64 want, bool conservative);
+
#endif /* _MDT_INTERNAL_H */
--- /dev/null
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2012, 2017 Intel Corporation.
+ */
+/*
+ * lustre/mdt/mdt_io.c
+ *
+ * Author: Mikhail Pershin <mike.pershin@intel.com>
+ */
+
+#define DEBUG_SUBSYSTEM S_FILTER
+
+#include <dt_object.h>
+#include "mdt_internal.h"
+
+/* --------------- MDT grant code ---------------- */
+
+long mdt_grant_connect(const struct lu_env *env,
+ struct obd_export *exp,
+ u64 want, bool conservative)
+{
+ struct mdt_device *mdt = mdt_exp2dev(exp);
+ u64 left;
+ long grant;
+
+ ENTRY;
+
+ dt_statfs(env, mdt->mdt_bottom, &mdt->mdt_osfs);
+
+ left = (mdt->mdt_osfs.os_bavail * mdt->mdt_osfs.os_bsize) / 2;
+
+ grant = left;
+
+ CDEBUG(D_CACHE, "%s: cli %s/%p ocd_grant: %ld want: %llu left: %llu\n",
+ exp->exp_obd->obd_name, exp->exp_client_uuid.uuid,
+ exp, grant, want, left);
+
+ return grant;
+}
+
+void mdt_grant_prepare_write(const struct lu_env *env,
+ struct obd_export *exp, struct obdo *oa,
+ struct niobuf_remote *rnb, int niocount)
+{
+ struct mdt_device *mdt = mdt_exp2dev(exp);
+ u64 left;
+
+ ENTRY;
+
+ left = (mdt->mdt_osfs.os_bavail * mdt->mdt_osfs.os_bsize) / 2;
+
+ /* grant more space back to the client if possible */
+ oa->o_grant = left;
+}
+/* ---------------- end of MDT grant code ---------------- */
+
+/* functions below are stubs for now, they will be implemented with
+ * grant support on MDT */
+static inline void mdt_io_counter_incr(struct obd_export *exp, int opcode,
+ char *jobid, long amount)
+{
+ return;
+}
+
+void mdt_grant_prepare_read(const struct lu_env *env,
+ struct obd_export *exp, struct obdo *oa)
+{
+ return;
+}
+
+void mdt_grant_commit(struct obd_export *exp, unsigned long pending,
+ int rc)
+{
+ return;
+
+}
+
+static inline void mdt_dom_read_lock(struct mdt_object *mo)
+{
+ down_read(&mo->mot_dom_sem);
+}
+
+static inline void mdt_dom_read_unlock(struct mdt_object *mo)
+{
+ up_read(&mo->mot_dom_sem);
+}
+
+static inline void mdt_dom_write_lock(struct mdt_object *mo)
+{
+ down_write(&mo->mot_dom_sem);
+}
+
+static inline void mdt_dom_write_unlock(struct mdt_object *mo)
+{
+ up_write(&mo->mot_dom_sem);
+}
+
+static int mdt_preprw_read(const struct lu_env *env, struct obd_export *exp,
+ struct mdt_device *mdt, struct mdt_object *mo,
+ struct lu_attr *la, int niocount,
+ struct niobuf_remote *rnb, int *nr_local,
+ struct niobuf_local *lnb, char *jobid)
+{
+ struct dt_object *dob;
+ int i, j, rc, tot_bytes = 0;
+
+ ENTRY;
+
+ mdt_dom_read_lock(mo);
+ if (!mdt_object_exists(mo))
+ GOTO(unlock, rc = -ENOENT);
+
+ dob = mdt_obj2dt(mo);
+ /* parse remote buffers to local buffers and prepare the latter */
+ *nr_local = 0;
+ for (i = 0, j = 0; i < niocount; i++) {
+ rc = dt_bufs_get(env, dob, rnb + i, lnb + j, 0);
+ if (unlikely(rc < 0))
+ GOTO(buf_put, rc);
+ /* correct index for local buffers to continue with */
+ j += rc;
+ *nr_local += rc;
+ tot_bytes += rnb[i].rnb_len;
+ }
+
+ rc = dt_attr_get(env, dob, la);
+ if (unlikely(rc))
+ GOTO(buf_put, rc);
+
+ rc = dt_read_prep(env, dob, lnb, *nr_local);
+ if (unlikely(rc))
+ GOTO(buf_put, rc);
+
+ mdt_io_counter_incr(exp, LPROC_MDT_IO_READ, jobid, tot_bytes);
+ RETURN(0);
+buf_put:
+ dt_bufs_put(env, dob, lnb, *nr_local);
+unlock:
+ mdt_dom_read_unlock(mo);
+ return rc;
+}
+
+static int mdt_preprw_write(const struct lu_env *env, struct obd_export *exp,
+ struct mdt_device *mdt, struct mdt_object *mo,
+ struct lu_attr *la, struct obdo *oa,
+ int objcount, struct obd_ioobj *obj,
+ struct niobuf_remote *rnb, int *nr_local,
+ struct niobuf_local *lnb, char *jobid)
+{
+ struct dt_object *dob;
+ int i, j, k, rc = 0, tot_bytes = 0;
+
+ ENTRY;
+
+ /* Process incoming grant info, set OBD_BRW_GRANTED flag and grant some
+ * space back if possible */
+ mdt_grant_prepare_write(env, exp, oa, rnb, obj->ioo_bufcnt);
+
+ mdt_dom_read_lock(mo);
+ if (!mdt_object_exists(mo)) {
+ CDEBUG(D_ERROR, "%s: BRW to missing obj "DFID"\n",
+ exp->exp_obd->obd_name, PFID(mdt_object_fid(mo)));
+ GOTO(unlock, rc = -ENOENT);
+ }
+
+ dob = mdt_obj2dt(mo);
+ /* parse remote buffers to local buffers and prepare the latter */
+ *nr_local = 0;
+ for (i = 0, j = 0; i < obj->ioo_bufcnt; i++) {
+ rc = dt_bufs_get(env, dob, rnb + i, lnb + j, 1);
+ if (unlikely(rc < 0))
+ GOTO(err, rc);
+ /* correct index for local buffers to continue with */
+ for (k = 0; k < rc; k++)
+ lnb[j+k].lnb_flags = rnb[i].rnb_flags;
+ j += rc;
+ *nr_local += rc;
+ tot_bytes += rnb[i].rnb_len;
+ }
+
+ rc = dt_write_prep(env, dob, lnb, *nr_local);
+ if (likely(rc))
+ GOTO(err, rc);
+
+ mdt_io_counter_incr(exp, LPROC_MDT_IO_WRITE, jobid, tot_bytes);
+ RETURN(0);
+err:
+ dt_bufs_put(env, dob, lnb, *nr_local);
+unlock:
+ mdt_dom_read_unlock(mo);
+ /* tgt_grant_prepare_write() was called, so we must commit */
+ mdt_grant_commit(exp, oa->o_grant_used, rc);
+ /* let's still process incoming grant information packed in the oa,
+ * but without enforcing grant since we won't proceed with the write.
+ * Just like a read request actually. */
+ mdt_grant_prepare_read(env, exp, oa);
+ return rc;
+}
+
+int mdt_obd_preprw(const struct lu_env *env, int cmd, struct obd_export *exp,
+ struct obdo *oa, int objcount, struct obd_ioobj *obj,
+ struct niobuf_remote *rnb, int *nr_local,
+ struct niobuf_local *lnb)
+{
+ struct tgt_session_info *tsi = tgt_ses_info(env);
+ struct mdt_thread_info *info = tsi2mdt_info(tsi);
+ struct lu_attr *la = &info->mti_attr.ma_attr;
+ struct mdt_device *mdt = mdt_dev(exp->exp_obd->obd_lu_dev);
+ struct mdt_object *mo;
+ char *jobid;
+ int rc = 0;
+
+ /* The default value PTLRPC_MAX_BRW_PAGES is set in tgt_brw_write()
+ * but for MDT it is different, correct it here. */
+ if (*nr_local > MD_MAX_BRW_PAGES)
+ *nr_local = MD_MAX_BRW_PAGES;
+
+ jobid = tsi->tsi_jobid;
+
+ if (!oa || objcount != 1 || obj->ioo_bufcnt == 0) {
+ CERROR("%s: bad parameters %p/%i/%i\n",
+ exp->exp_obd->obd_name, oa, objcount, obj->ioo_bufcnt);
+ rc = -EPROTO;
+ }
+
+ mo = mdt_object_find(env, mdt, &tsi->tsi_fid);
+ if (IS_ERR(mo))
+ GOTO(out, rc = PTR_ERR(mo));
+
+ LASSERT(info->mti_object == NULL);
+ info->mti_object = mo;
+
+ if (cmd == OBD_BRW_WRITE) {
+ la_from_obdo(la, oa, OBD_MD_FLGETATTR);
+ rc = mdt_preprw_write(env, exp, mdt, mo, la, oa,
+ objcount, obj, rnb, nr_local, lnb,
+ jobid);
+ } else if (cmd == OBD_BRW_READ) {
+ mdt_grant_prepare_read(env, exp, oa);
+ rc = mdt_preprw_read(env, exp, mdt, mo, la,
+ obj->ioo_bufcnt, rnb, nr_local, lnb,
+ jobid);
+ obdo_from_la(oa, la, LA_ATIME);
+ } else {
+ CERROR("%s: wrong cmd %d received!\n",
+ exp->exp_obd->obd_name, cmd);
+ rc = -EPROTO;
+ }
+ if (rc) {
+ lu_object_put(env, &mo->mot_obj);
+ info->mti_object = NULL;
+ }
+out:
+ RETURN(rc);
+}
+
+static int mdt_commitrw_read(const struct lu_env *env, struct mdt_device *mdt,
+ struct mdt_object *mo, int objcount, int niocount,
+ struct niobuf_local *lnb)
+{
+ struct dt_object *dob;
+ int rc = 0;
+
+ ENTRY;
+
+ LASSERT(niocount > 0);
+
+ dob = mdt_obj2dt(mo);
+
+ dt_bufs_put(env, dob, lnb, niocount);
+
+ mdt_dom_read_unlock(mo);
+ RETURN(rc);
+}
+
+static int mdt_commitrw_write(const struct lu_env *env, struct obd_export *exp,
+ struct mdt_device *mdt, struct mdt_object *mo,
+ struct lu_attr *la, int objcount, int niocount,
+ struct niobuf_local *lnb, unsigned long granted,
+ int old_rc)
+{
+ struct dt_device *dt = mdt->mdt_bottom;
+ struct dt_object *dob;
+ struct thandle *th;
+ int rc = 0;
+ int retries = 0;
+ int i;
+
+ ENTRY;
+
+ dob = mdt_obj2dt(mo);
+
+ if (old_rc)
+ GOTO(out, rc = old_rc);
+
+ la->la_valid &= LA_ATIME | LA_MTIME | LA_CTIME;
+retry:
+ if (!dt_object_exists(dob))
+ GOTO(out, rc = -ENOENT);
+
+ th = dt_trans_create(env, dt);
+ if (IS_ERR(th))
+ GOTO(out, rc = PTR_ERR(th));
+
+ for (i = 0; i < niocount; i++) {
+ if (!(lnb[i].lnb_flags & OBD_BRW_ASYNC)) {
+ th->th_sync = 1;
+ break;
+ }
+ }
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_OST_DQACQ_NET))
+ GOTO(out_stop, rc = -EINPROGRESS);
+
+ rc = dt_declare_write_commit(env, dob, lnb, niocount, th);
+ if (rc)
+ GOTO(out_stop, rc);
+
+ if (la->la_valid) {
+ /* update [mac]time if needed */
+ rc = dt_declare_attr_set(env, dob, la, th);
+ if (rc)
+ GOTO(out_stop, rc);
+ }
+
+ rc = dt_trans_start(env, dt, th);
+ if (rc)
+ GOTO(out_stop, rc);
+
+ dt_write_lock(env, dob, 0);
+ rc = dt_write_commit(env, dob, lnb, niocount, th);
+ if (rc)
+ GOTO(unlock, rc);
+
+ if (la->la_valid) {
+ rc = dt_attr_set(env, dob, la, th);
+ if (rc)
+ GOTO(unlock, rc);
+ }
+ /* get attr to return */
+ rc = dt_attr_get(env, dob, la);
+unlock:
+ dt_write_unlock(env, dob);
+
+out_stop:
+ /* Force commit to make the just-deleted blocks
+ * reusable. LU-456 */
+ if (rc == -ENOSPC)
+ th->th_sync = 1;
+
+ th->th_result = rc;
+ dt_trans_stop(env, dt, th);
+ if (rc == -ENOSPC && retries++ < 3) {
+ CDEBUG(D_INODE, "retry after force commit, retries:%d\n",
+ retries);
+ goto retry;
+ }
+
+out:
+ dt_bufs_put(env, dob, lnb, niocount);
+ mdt_dom_read_unlock(mo);
+ mdt_grant_commit(exp, granted, old_rc);
+ RETURN(rc);
+}
+
+int mdt_obd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp,
+ struct obdo *oa, int objcount, struct obd_ioobj *obj,
+ struct niobuf_remote *rnb, int npages,
+ struct niobuf_local *lnb, int old_rc)
+{
+ struct mdt_thread_info *info = mdt_th_info(env);
+ struct mdt_device *mdt = mdt_dev(exp->exp_obd->obd_lu_dev);
+ struct mdt_object *mo = info->mti_object;
+ struct lu_attr *la = &info->mti_attr.ma_attr;
+ __u64 valid;
+ int rc = 0;
+
+ if (npages == 0) {
+ CERROR("%s: no pages to commit\n",
+ exp->exp_obd->obd_name);
+ rc = -EPROTO;
+ }
+
+ LASSERT(mo);
+
+ if (cmd == OBD_BRW_WRITE) {
+ /* Don't update timestamps if this write is older than a
+ * setattr which modifies the timestamps. b=10150 */
+
+ /* XXX when we start having persistent reservations this needs
+ * to be changed to ofd_fmd_get() to create the fmd if it
+ * doesn't already exist so we can store the reservation handle
+ * there. */
+ valid = OBD_MD_FLUID | OBD_MD_FLGID;
+ valid |= OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME;
+
+ la_from_obdo(la, oa, valid);
+
+ rc = mdt_commitrw_write(env, exp, mdt, mo, la, objcount,
+ npages, lnb, oa->o_grant_used, old_rc);
+ if (rc == 0)
+ obdo_from_la(oa, la, VALID_FLAGS | LA_GID | LA_UID);
+ else
+ obdo_from_la(oa, la, LA_GID | LA_UID);
+
+ /* don't report overquota flag if we failed before reaching
+ * commit */
+ if (old_rc == 0 && (rc == 0 || rc == -EDQUOT)) {
+ /* return the overquota flags to client */
+ if (lnb[0].lnb_flags & OBD_BRW_OVER_USRQUOTA) {
+ if (oa->o_valid & OBD_MD_FLFLAGS)
+ oa->o_flags |= OBD_FL_NO_USRQUOTA;
+ else
+ oa->o_flags = OBD_FL_NO_USRQUOTA;
+ }
+
+ if (lnb[0].lnb_flags & OBD_BRW_OVER_GRPQUOTA) {
+ if (oa->o_valid & OBD_MD_FLFLAGS)
+ oa->o_flags |= OBD_FL_NO_GRPQUOTA;
+ else
+ oa->o_flags = OBD_FL_NO_GRPQUOTA;
+ }
+
+ oa->o_valid |= OBD_MD_FLFLAGS | OBD_MD_FLUSRQUOTA |
+ OBD_MD_FLGRPQUOTA;
+ }
+ } else if (cmd == OBD_BRW_READ) {
+ rc = mdt_commitrw_read(env, mdt, mo, objcount, npages, lnb);
+ if (old_rc)
+ rc = old_rc;
+ } else {
+ rc = -EPROTO;
+ }
+ /* this put is pair to object_get in ofd_preprw_write */
+ mdt_thread_info_fini(info);
+ RETURN(rc);
+}
+
+int mdt_object_punch(const struct lu_env *env, struct dt_device *dt,
+ struct dt_object *dob, __u64 start, __u64 end,
+ struct lu_attr *la)
+{
+ struct thandle *th;
+ int rc;
+
+ ENTRY;
+
+ /* we support truncate, not punch yet */
+ LASSERT(end == OBD_OBJECT_EOF);
+
+ if (!dt_object_exists(dob))
+ RETURN(-ENOENT);
+
+ th = dt_trans_create(env, dt);
+ if (IS_ERR(th))
+ RETURN(PTR_ERR(th));
+
+ rc = dt_declare_attr_set(env, dob, la, th);
+ if (rc)
+ GOTO(stop, rc);
+
+ rc = dt_declare_punch(env, dob, start, OBD_OBJECT_EOF, th);
+ if (rc)
+ GOTO(stop, rc);
+
+ tgt_vbr_obj_set(env, dob);
+ rc = dt_trans_start(env, dt, th);
+ if (rc)
+ GOTO(stop, rc);
+
+ dt_write_lock(env, dob, 0);
+ rc = dt_punch(env, dob, start, OBD_OBJECT_EOF, th);
+ if (rc)
+ GOTO(unlock, rc);
+ rc = dt_attr_set(env, dob, la, th);
+ if (rc)
+ GOTO(unlock, rc);
+unlock:
+ dt_write_unlock(env, dob);
+stop:
+ th->th_result = rc;
+ dt_trans_stop(env, dt, th);
+ RETURN(rc);
+}
+
+int mdt_punch_hdl(struct tgt_session_info *tsi)
+{
+ const struct obdo *oa = &tsi->tsi_ost_body->oa;
+ struct ost_body *repbody;
+ struct mdt_thread_info *info;
+ struct lu_attr *la;
+ struct ldlm_namespace *ns = tsi->tsi_tgt->lut_obd->obd_namespace;
+ struct obd_export *exp = tsi->tsi_exp;
+ struct mdt_device *mdt = mdt_dev(exp->exp_obd->obd_lu_dev);
+ struct mdt_object *mo;
+ struct dt_object *dob;
+ __u64 flags = 0;
+ struct lustre_handle lh = { 0, };
+ __u64 start, end;
+ int rc;
+ bool srvlock;
+
+ ENTRY;
+
+ /* check that we do support OBD_CONNECT_TRUNCLOCK. */
+ CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK);
+
+ if ((oa->o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
+ (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
+ RETURN(err_serious(-EPROTO));
+
+ repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
+ if (repbody == NULL)
+ RETURN(err_serious(-ENOMEM));
+
+ /* punch start,end are passed in o_size,o_blocks throught wire */
+ start = oa->o_size;
+ end = oa->o_blocks;
+
+ if (end != OBD_OBJECT_EOF) /* Only truncate is supported */
+ RETURN(-EPROTO);
+
+ info = tsi2mdt_info(tsi);
+ la = &info->mti_attr.ma_attr;
+ /* standard truncate optimization: if file body is completely
+ * destroyed, don't send data back to the server. */
+ if (start == 0)
+ flags |= LDLM_FL_AST_DISCARD_DATA;
+
+ repbody->oa.o_oi = oa->o_oi;
+ repbody->oa.o_valid = OBD_MD_FLID;
+
+ srvlock = (exp_connect_flags(exp) & OBD_CONNECT_SRVLOCK) &&
+ oa->o_valid & OBD_MD_FLFLAGS &&
+ oa->o_flags & OBD_FL_SRVLOCK;
+
+ if (srvlock) {
+ rc = tgt_mdt_data_lock(ns, &tsi->tsi_resid, &lh, LCK_PW,
+ &flags);
+ if (rc != 0)
+ GOTO(out, rc);
+ }
+
+ CDEBUG(D_INODE, "calling punch for object "DFID", valid = %#llx"
+ ", start = %lld, end = %lld\n", PFID(&tsi->tsi_fid),
+ oa->o_valid, start, end);
+
+ mo = mdt_object_find(tsi->tsi_env, mdt, &tsi->tsi_fid);
+ if (IS_ERR(mo))
+ GOTO(out_unlock, rc = PTR_ERR(mo));
+
+ mdt_dom_write_lock(mo);
+ if (!mdt_object_exists(mo))
+ GOTO(out_put, rc = -ENOENT);
+ dob = mdt_obj2dt(mo);
+
+ la_from_obdo(la, oa, OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME);
+ la->la_size = start;
+ la->la_valid |= LA_SIZE;
+
+ rc = mdt_object_punch(tsi->tsi_env, mdt->mdt_bottom, dob,
+ start, end, la);
+ mdt_dom_write_unlock(mo);
+ if (rc)
+ GOTO(out_put, rc);
+
+ mdt_io_counter_incr(tsi->tsi_exp, LPROC_MDT_IO_PUNCH,
+ tsi->tsi_jobid, 1);
+ EXIT;
+out_put:
+ lu_object_put(tsi->tsi_env, &mo->mot_obj);
+out_unlock:
+ if (srvlock)
+ mdt_save_lock(info, &lh, LCK_PW, rc);
+out:
+ mdt_thread_info_fini(info);
+ if (rc == 0) {
+ struct ldlm_resource *res;
+
+ /* we do not call this before to avoid lu_object_find() in
+ * ->lvbo_update() holding another reference on the object.
+ * otherwise concurrent destroy can make the object unavailable
+ * for 2nd lu_object_find() waiting for the first reference
+ * to go... deadlock! */
+ res = ldlm_resource_get(ns, NULL, &tsi->tsi_resid,
+ LDLM_IBITS, 0);
+ if (!IS_ERR(res)) {
+ ldlm_res_lvbo_update(res, NULL, 0);
+ ldlm_resource_putref(res);
+ }
+ }
+ return rc;
+}
+
void mdt_stats_counter_init(struct lprocfs_stats *stats)
{
+ LASSERT(stats && stats->ls_num >= LPROC_MDT_LAST);
+
lprocfs_counter_init(stats, LPROC_MDT_OPEN, 0, "open", "reqs");
lprocfs_counter_init(stats, LPROC_MDT_CLOSE, 0, "close", "reqs");
lprocfs_counter_init(stats, LPROC_MDT_MKNOD, 0, "mknod", "reqs");
"samedir_rename", "reqs");
lprocfs_counter_init(stats, LPROC_MDT_CROSSDIR_RENAME, 0,
"crossdir_rename", "reqs");
+ lprocfs_counter_init(stats, LPROC_MDT_IO_READ,
+ LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes");
+ lprocfs_counter_init(stats, LPROC_MDT_IO_WRITE,
+ LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes");
+ lprocfs_counter_init(stats, LPROC_MDT_IO_PUNCH, 0, "punch", "reqs");
}
int mdt_procfs_init(struct mdt_device *mdt, const char *name)
struct ptlrpc_service *mds_mdsc_service;
struct ptlrpc_service *mds_mdss_service;
struct ptlrpc_service *mds_fld_service;
+ struct ptlrpc_service *mds_io_service;
struct mutex mds_health_mutex;
struct kset *mds_kset;
};
module_param(mds_num_threads, ulong, 0444);
MODULE_PARM_DESC(mds_num_threads, "number of MDS service threads to start");
+int mds_max_io_threads = 512;
+module_param(mds_max_io_threads, int, 0444);
+MODULE_PARM_DESC(mds_max_io_threads, "maximum number of MDS IO service threads");
+
static char *mds_num_cpts;
module_param(mds_num_cpts, charp, 0444);
MODULE_PARM_DESC(mds_num_cpts, "CPU partitions MDS threads should run on");
ptlrpc_unregister_service(m->mds_fld_service);
m->mds_fld_service = NULL;
}
+ if (m->mds_io_service != NULL) {
+ ptlrpc_unregister_service(m->mds_io_service);
+ m->mds_io_service = NULL;
+ }
mutex_unlock(&m->mds_health_mutex);
EXIT;
GOTO(err_mds_svc, rc);
}
+ memset(&conf, 0, sizeof(conf));
+ conf = (typeof(conf)) {
+ .psc_name = LUSTRE_MDT_NAME "_io",
+ .psc_watchdog_factor = MDT_SERVICE_WATCHDOG_FACTOR,
+ .psc_buf = {
+ .bc_nbufs = OST_NBUFS,
+ .bc_buf_size = OST_IO_BUFSIZE,
+ .bc_req_max_size = OST_IO_MAXREQSIZE,
+ .bc_rep_max_size = OST_IO_MAXREPSIZE,
+ .bc_req_portal = MDS_IO_PORTAL,
+ .bc_rep_portal = MDC_REPLY_PORTAL,
+ },
+ .psc_thr = {
+ .tc_thr_name = "ll_mdt_io",
+ .tc_thr_factor = OSS_THR_FACTOR,
+ .tc_nthrs_init = OSS_NTHRS_INIT,
+ .tc_nthrs_base = OSS_NTHRS_BASE,
+ .tc_nthrs_max = mds_max_io_threads,
+ .tc_cpu_affinity = 1,
+ .tc_ctx_tags = LCT_DT_THREAD | LCT_MD_THREAD,
+ },
+ .psc_ops = {
+ .so_thr_init = tgt_io_thread_init,
+ .so_thr_done = tgt_io_thread_done,
+ .so_req_handler = tgt_request_handle,
+ .so_req_printer = target_print_req,
+ },
+ };
+ m->mds_io_service = ptlrpc_register_service(&conf, m->mds_kset,
+ procfs_entry);
+ if (IS_ERR(m->mds_io_service)) {
+ rc = PTR_ERR(m->mds_io_service);
+ CERROR("failed to start MDT I/O service: %d\n", rc);
+ m->mds_io_service = NULL;
+ GOTO(err_mds_svc, rc);
+ }
+
EXIT;
err_mds_svc:
if (rc)
rc |= ptlrpc_service_health_check(mds->mds_mdsc_service);
rc |= ptlrpc_service_health_check(mds->mds_mdss_service);
rc |= ptlrpc_service_health_check(mds->mds_fld_service);
+ rc |= ptlrpc_service_health_check(mds->mds_io_service);
mutex_unlock(&mds->mds_health_mutex);
return rc != 0 ? 1 : 0;
TIMEOUT_GRANT);
}
-static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
+void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
{
/*
* ocd_grant is the total grant amount we're expect to hold: if we've
list_empty(&cli->cl_grant_shrink_list))
osc_add_shrink_grant(cli);
}
+EXPORT_SYMBOL(osc_init_grant);
/* We assume that the reason this OSC got a short read is because it read
* beyond the end of a stripe file; i.e. lustre is reading a sparse file
if (rc)
return ERR_PTR(rc);
- if ((fid_is_idif(fid) || fid_is_norm(fid) || fid_is_echo(fid)) &&
- osd->od_is_ost) {
+ if ((fid_is_idif(fid) || fid_is_norm(fid) || fid_is_echo(fid))) {
/* The minimum block size must be at least page size otherwise
* it will break the assumption in tgt_thread_big_cache where
* the array size is PTLRPC_MAX_BRW_PAGES. It will also affect
EXIT;
}
EXPORT_SYMBOL(tgt_io_thread_done);
+
+/**
+ * Helper function for getting Data-on-MDT file server DLM lock
+ * if asked by client.
+ */
+int tgt_mdt_data_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
+ struct lustre_handle *lh, int mode, __u64 *flags)
+{
+ union ldlm_policy_data policy;
+ int rc;
+
+ ENTRY;
+
+ LASSERT(lh != NULL);
+ LASSERT(ns != NULL);
+ LASSERT(!lustre_handle_is_used(lh));
+
+ policy.l_inodebits.bits = MDS_INODELOCK_DOM | MDS_INODELOCK_UPDATE;
+ policy.l_inodebits.try_bits = 0;
+
+ rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, &policy, mode,
+ flags, ldlm_blocking_ast,
+ ldlm_completion_ast, ldlm_glimpse_ast,
+ NULL, 0, LVB_T_NONE, NULL, lh);
+
+ RETURN(rc == ELDLM_OK ? 0 : -EIO);
+}
+EXPORT_SYMBOL(tgt_mdt_data_lock);
+
/**
* Helper function for getting server side [start, start+count] DLM lock
* if asked by client.
}
EXPORT_SYMBOL(tgt_extent_unlock);
-int tgt_brw_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
- struct obd_ioobj *obj, struct niobuf_remote *nb,
- struct lustre_handle *lh, enum ldlm_mode mode)
+static int tgt_brw_lock(struct obd_export *exp, struct ldlm_res_id *res_id,
+ struct obd_ioobj *obj, struct niobuf_remote *nb,
+ struct lustre_handle *lh, enum ldlm_mode mode)
{
+ struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
__u64 flags = 0;
int nrbufs = obj->ioo_bufcnt;
int i;
+ int rc;
ENTRY;
if (!(nb[i].rnb_flags & OBD_BRW_SRVLOCK))
RETURN(-EFAULT);
- RETURN(tgt_extent_lock(ns, res_id, nb[0].rnb_offset,
- nb[nrbufs - 1].rnb_offset +
- nb[nrbufs - 1].rnb_len - 1,
- lh, mode, &flags));
+ /* MDT IO for data-on-mdt */
+ if (exp->exp_connect_data.ocd_connect_flags & OBD_CONNECT_IBITS)
+ rc = tgt_mdt_data_lock(ns, res_id, lh, mode, &flags);
+ else
+ rc = tgt_extent_lock(ns, res_id, nb[0].rnb_offset,
+ nb[nrbufs - 1].rnb_offset +
+ nb[nrbufs - 1].rnb_len - 1,
+ lh, mode, &flags);
+ RETURN(rc);
}
-void tgt_brw_unlock(struct obd_ioobj *obj, struct niobuf_remote *niob,
- struct lustre_handle *lh, enum ldlm_mode mode)
+static void tgt_brw_unlock(struct obd_ioobj *obj, struct niobuf_remote *niob,
+ struct lustre_handle *lh, enum ldlm_mode mode)
{
ENTRY;
ENTRY;
- if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL) {
+ if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL &&
+ ptlrpc_req2svc(req)->srv_req_portal != MDS_IO_PORTAL) {
CERROR("%s: deny read request from %s to portal %u\n",
tgt_name(tsi->tsi_tgt),
obd_export_nid2str(req->rq_export),
local_nb = tbc->local;
- rc = tgt_brw_lock(exp->exp_obd->obd_namespace, &tsi->tsi_resid, ioo,
- remote_nb, &lockh, LCK_PR);
+ rc = tgt_brw_lock(exp, &tsi->tsi_resid, ioo, remote_nb, &lockh,
+ LCK_PR);
if (rc != 0)
RETURN(rc);
ENTRY;
- if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL) {
+ if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL &&
+ ptlrpc_req2svc(req)->srv_req_portal != MDS_IO_PORTAL) {
CERROR("%s: deny write request from %s to portal %u\n",
tgt_name(tsi->tsi_tgt),
obd_export_nid2str(req->rq_export),
local_nb = tbc->local;
- rc = tgt_brw_lock(exp->exp_obd->obd_namespace, &tsi->tsi_resid, ioo,
- remote_nb, &lockh, LCK_PW);
+ rc = tgt_brw_lock(exp, &tsi->tsi_resid, ioo, remote_nb, &lockh,
+ LCK_PW);
if (rc != 0)
GOTO(out, rc);