* Additional fail id that can be set by handler.
*/
int tsi_reply_fail_id;
+ bool tsi_preprocessed;
/* request JobID */
char *tsi_jobid;
};
/* Request version for this opcode */
int th_version;
/* Handler function */
- int (*th_act)(struct tgt_session_info *tti);
+ int (*th_act)(struct tgt_session_info *tsi);
/* Handler function for high priority requests */
- int (*th_hp)(struct tgt_session_info *tti);
+ void (*th_hp)(struct tgt_session_info *tsi);
/* Request format for this request */
const struct req_format *th_fmt;
};
void *tgt_cb_data;
};
+int tgt_hpreq_handler(struct ptlrpc_request *req);
+
/* target/tgt_main.c */
void tgt_boot_epoch_update(struct lu_target *lut);
int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *lut,
/*
* Unified target generic handers macros and generic functions.
*/
-#define TGT_RPC_HANDLER(base, flags, opc, fn, fmt, version) \
+#define TGT_RPC_HANDLER_HP(base, flags, opc, fn, hp, fmt, version) \
[opc - base] = { \
.th_name = #opc, \
.th_fail_id = OBD_FAIL_ ## opc ## _NET, \
.th_flags = flags, \
.th_act = fn, \
.th_fmt = fmt, \
- .th_version = version \
+ .th_version = version, \
+ .th_hp = hp, \
}
+#define TGT_RPC_HANDLER(base, flags, opc, fn, fmt, version) \
+ TGT_RPC_HANDLER_HP(base, flags, opc, fn, NULL, fmt, version)
/* MDT Request with a format known in advance */
#define TGT_MDT_HDL(flags, name, fn) \
TGT_RPC_HANDLER(MDS_FIRST_OPC, flags, name, fn, NULL, \
LUSTRE_MDS_VERSION)
-/* MDT Request with a format known in advance */
+/* OST Request with a format known in advance */
#define TGT_OST_HDL(flags, name, fn) \
TGT_RPC_HANDLER(OST_FIRST_OPC, flags, name, fn, &RQF_ ## name, \
LUSTRE_OST_VERSION)
+#define TGT_OST_HDL_HP(flags, name, fn, hp) \
+ TGT_RPC_HANDLER_HP(OST_FIRST_OPC, flags, name, fn, hp, \
+ &RQF_ ## name, LUSTRE_OST_VERSION)
/* MGS request with a format known in advance */
#define TGT_MGS_HDL(flags, name, fn) \
struct ptlrpc_request_pool *rq_pool;
struct lu_context rq_session;
- struct lu_context rq_recov_session;
/** request format description */
struct req_capsule rq_pill;
struct ptlrpc_request *req,
svc_handler_t handler)
{
- int rc;
- ENTRY;
+ int rc;
- /**
- * export can be evicted during recovery, no need to handle replays for
- * it after that, discard such request silently
- */
- if (req->rq_export->exp_disconnected)
- GOTO(reqcopy_put, rc = 0);
+ ENTRY;
- rc = lu_context_init(&req->rq_recov_session, LCT_SERVER_SESSION);
- if (rc) {
- CERROR("Failure to initialize session: %d\n", rc);
- GOTO(reqcopy_put, rc);
- }
+ /**
+ * export can be evicted during recovery, no need to handle replays for
+ * it after that, discard such request silently
+ */
+ if (req->rq_export->exp_disconnected)
+ GOTO(reqcopy_put, rc = 0);
- req->rq_recov_session.lc_thread = thread;
- lu_context_enter(&req->rq_recov_session);
- req->rq_svc_thread = thread;
- req->rq_svc_thread->t_env->le_ses = &req->rq_recov_session;
+ req->rq_session.lc_thread = thread;
+ req->rq_svc_thread = thread;
+ req->rq_svc_thread->t_env->le_ses = &req->rq_session;
/* thread context */
lu_context_enter(&thread->t_env->le_ctx);
(void)handler(req);
lu_context_exit(&thread->t_env->le_ctx);
- lu_context_exit(&req->rq_recov_session);
- lu_context_fini(&req->rq_recov_session);
/* don't reset timer for final stage */
if (!exp_finished(req->rq_export)) {
int to = obd_timeout;
return rc;
}
-
static int ofd_quotactl(struct tgt_session_info *tsi)
{
struct obd_quotactl *oqctl, *repoqc;
RETURN(rc);
}
+/* High priority request handlers for OFD */
+
+/* prolong locks for the current service time of the corresponding
+ * portal (= OST_IO_PORTAL)
+ */
+static inline int prolong_timeout(struct ptlrpc_request *req)
+{
+ struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
+
+ if (AT_OFF)
+ return obd_timeout / 2;
+
+ return max(at_est2timeout(at_get(&svcpt->scp_at_estimate)),
+ ldlm_timeout);
+}
+
+static int ofd_prolong_one_lock(struct tgt_session_info *tsi,
+ struct ldlm_lock *lock,
+ struct ldlm_extent *extent, int timeout)
+{
+
+ if (lock->l_flags & LDLM_FL_DESTROYED) /* lock already cancelled */
+ return 0;
+
+ /* XXX: never try to grab resource lock here because we're inside
+ * exp_bl_list_lock; in ldlm_lockd.c to handle waiting list we take
+ * res lock and then exp_bl_list_lock. */
+
+ if (!(lock->l_flags & LDLM_FL_AST_SENT))
+ /* ignore locks not being cancelled */
+ return 0;
+
+ LDLM_DEBUG(lock, "refreshed for req x"LPU64" ext("LPU64"->"LPU64") "
+ "to %ds.\n", tgt_ses_req(tsi)->rq_xid, extent->start,
+ extent->end, timeout);
+
+ /* OK. this is a possible lock the user holds doing I/O
+ * let's refresh eviction timer for it */
+ ldlm_refresh_waiting_lock(lock, timeout);
+ return 1;
+}
+
+static int ofd_prolong_extent_locks(struct tgt_session_info *tsi,
+ __u64 start, __u64 end)
+{
+ struct obd_export *exp = tsi->tsi_exp;
+ struct obdo *oa = &tsi->tsi_ost_body->oa;
+ struct ldlm_extent extent = {
+ .start = start,
+ .end = end
+ };
+ struct ldlm_lock *lock;
+ int timeout = prolong_timeout(tgt_ses_req(tsi));
+ int lock_count = 0;
+
+ ENTRY;
+
+ if (oa->o_valid & OBD_MD_FLHANDLE) {
+ /* mostly a request should be covered by only one lock, try
+ * fast path. */
+ lock = ldlm_handle2lock(&oa->o_handle);
+ if (lock != NULL) {
+ /* Fast path to check if the lock covers the whole IO
+ * region exclusively. */
+ if (lock->l_granted_mode == LCK_PW &&
+ ldlm_extent_contain(&lock->l_policy_data.l_extent,
+ &extent)) {
+ /* bingo */
+ LASSERT(lock->l_export == exp);
+ lock_count = ofd_prolong_one_lock(tsi, lock,
+ &extent, timeout);
+ LDLM_LOCK_PUT(lock);
+ RETURN(lock_count);
+ }
+ LDLM_LOCK_PUT(lock);
+ }
+ }
+
+ spin_lock_bh(&exp->exp_bl_list_lock);
+ list_for_each_entry(lock, &exp->exp_bl_list, l_exp_list) {
+ LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
+ LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
+
+ if (!ldlm_res_eq(&tsi->tsi_resid, &lock->l_resource->lr_name))
+ continue;
+
+ if (!ldlm_extent_overlap(&lock->l_policy_data.l_extent,
+ &extent))
+ continue;
+
+ lock_count += ofd_prolong_one_lock(tsi, lock, &extent, timeout);
+ }
+ spin_unlock_bh(&exp->exp_bl_list_lock);
+
+ RETURN(lock_count);
+}
+
+/**
+ * Returns 1 if the given PTLRPC matches the given LDLM lock, or 0 if it does
+ * not.
+ */
+static int ofd_rw_hpreq_lock_match(struct ptlrpc_request *req,
+ struct ldlm_lock *lock)
+{
+ struct niobuf_remote *rnb;
+ struct obd_ioobj *ioo;
+ ldlm_mode_t mode;
+ struct ldlm_extent ext;
+ __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
+
+ ENTRY;
+
+ ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
+ LASSERT(ioo != NULL);
+
+ rnb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
+ LASSERT(rnb != NULL);
+
+ ext.start = rnb->offset;
+ rnb += ioo->ioo_bufcnt - 1;
+ ext.end = rnb->offset + rnb->len - 1;
+
+ LASSERT(lock->l_resource != NULL);
+ if (!ostid_res_name_eq(&ioo->ioo_oid, &lock->l_resource->lr_name))
+ RETURN(0);
+
+ mode = LCK_PW;
+ if (opc == OST_READ)
+ mode |= LCK_PR;
+
+ if (!(lock->l_granted_mode & mode))
+ RETURN(0);
+
+ RETURN(ldlm_extent_overlap(&lock->l_policy_data.l_extent, &ext));
+}
+
+/**
+ * High-priority queue request check for whether the given PTLRPC request
+ * (\a req) is blocking an LDLM lock cancel.
+ *
+ * Returns 1 if the given given PTLRPC request (\a req) is blocking an LDLM lock
+ * cancel, 0 if it is not, and -EFAULT if the request is malformed.
+ *
+ * Only OST_READs, OST_WRITEs and OST_PUNCHes go on the h-p RPC queue. This
+ * function looks only at OST_READs and OST_WRITEs.
+ */
+static int ofd_rw_hpreq_check(struct ptlrpc_request *req)
+{
+ struct tgt_session_info *tsi;
+ struct obd_ioobj *ioo;
+ struct niobuf_remote *rnb;
+ __u64 start, end;
+ int lock_count;
+
+ ENTRY;
+
+ /* Don't use tgt_ses_info() to get session info, because lock_match()
+ * can be called while request has no processing thread yet. */
+ tsi = lu_context_key_get(&req->rq_session, &tgt_session_key);
+ LASSERT(tsi != NULL);
+
+ /*
+ * Use LASSERT below because malformed RPCs should have
+ * been filtered out in tgt_hpreq_handler().
+ */
+ ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
+ LASSERT(ioo != NULL);
+
+ rnb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
+ LASSERT(rnb != NULL);
+ LASSERT(!(rnb->flags & OBD_BRW_SRVLOCK));
+
+ start = rnb->offset;
+ rnb += ioo->ioo_bufcnt - 1;
+ end = rnb->offset + rnb->len - 1;
+
+ DEBUG_REQ(D_RPCTRACE, req, "%s %s: refresh rw locks: "DFID
+ " ("LPU64"->"LPU64")\n",
+ tgt_name(tsi->tsi_tgt), current->comm,
+ PFID(&tsi->tsi_fid), start, end);
+
+ lock_count = ofd_prolong_extent_locks(tsi, start, end);
+
+ CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
+ tgt_name(tsi->tsi_tgt), lock_count, req);
+
+ RETURN(lock_count > 0);
+}
+
+static void ofd_rw_hpreq_fini(struct ptlrpc_request *req)
+{
+ ofd_rw_hpreq_check(req);
+}
+
+/**
+ * Like tgt_rw_hpreq_lock_match(), but for OST_PUNCH RPCs.
+ */
+static int ofd_punch_hpreq_lock_match(struct ptlrpc_request *req,
+ struct ldlm_lock *lock)
+{
+ struct tgt_session_info *tsi;
+
+ /* Don't use tgt_ses_info() to get session info, because lock_match()
+ * can be called while request has no processing thread yet. */
+ tsi = lu_context_key_get(&req->rq_session, &tgt_session_key);
+ LASSERT(tsi != NULL);
+
+ LASSERT(tsi->tsi_ost_body != NULL);
+ if (tsi->tsi_ost_body->oa.o_valid & OBD_MD_FLHANDLE &&
+ tsi->tsi_ost_body->oa.o_handle.cookie == lock->l_handle.h_cookie)
+ return 1;
+
+ return 0;
+}
+
+/**
+ * Like ost_rw_hpreq_check(), but for OST_PUNCH RPCs.
+ */
+static int ofd_punch_hpreq_check(struct ptlrpc_request *req)
+{
+ struct tgt_session_info *tsi;
+ struct obdo *oa;
+ int lock_count;
+
+ ENTRY;
+
+ /* Don't use tgt_ses_info() to get session info, because lock_match()
+ * can be called while request has no processing thread yet. */
+ tsi = lu_context_key_get(&req->rq_session, &tgt_session_key);
+ LASSERT(tsi != NULL);
+ oa = &tsi->tsi_ost_body->oa;
+
+ LASSERT(!(oa->o_valid & OBD_MD_FLFLAGS &&
+ oa->o_flags & OBD_FL_SRVLOCK));
+
+ CDEBUG(D_DLMTRACE,
+ "%s: refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
+ tgt_name(tsi->tsi_tgt), tsi->tsi_resid.name[0],
+ tsi->tsi_resid.name[1], oa->o_size, oa->o_blocks);
+
+ lock_count = ofd_prolong_extent_locks(tsi, oa->o_size, oa->o_blocks);
+
+ CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
+ tgt_name(tsi->tsi_tgt), lock_count, req);
+
+ RETURN(lock_count > 0);
+}
+
+static void ofd_punch_hpreq_fini(struct ptlrpc_request *req)
+{
+ ofd_punch_hpreq_check(req);
+}
+
+struct ptlrpc_hpreq_ops ofd_hpreq_rw = {
+ .hpreq_lock_match = ofd_rw_hpreq_lock_match,
+ .hpreq_check = ofd_rw_hpreq_check,
+ .hpreq_fini = ofd_rw_hpreq_fini
+};
+
+struct ptlrpc_hpreq_ops ofd_hpreq_punch = {
+ .hpreq_lock_match = ofd_punch_hpreq_lock_match,
+ .hpreq_check = ofd_punch_hpreq_check,
+ .hpreq_fini = ofd_punch_hpreq_fini
+};
+
+/** Assign high priority operations to the IO requests */
+static void ofd_hp_brw(struct tgt_session_info *tsi)
+{
+ struct niobuf_remote *rnb;
+ struct obd_ioobj *ioo;
+
+ ENTRY;
+
+ ioo = req_capsule_client_get(tsi->tsi_pill, &RMF_OBD_IOOBJ);
+ LASSERT(ioo != NULL); /* must exist after request preprocessing */
+ if (ioo->ioo_bufcnt > 0) {
+ rnb = req_capsule_client_get(tsi->tsi_pill, &RMF_NIOBUF_REMOTE);
+ LASSERT(rnb != NULL); /* must exist after request preprocessing */
+
+ /* no high priority if server lock is needed */
+ if (rnb->flags & OBD_BRW_SRVLOCK)
+ return;
+ }
+ tgt_ses_req(tsi)->rq_ops = &ofd_hpreq_rw;
+}
+
+static void ofd_hp_punch(struct tgt_session_info *tsi)
+{
+ LASSERT(tsi->tsi_ost_body != NULL); /* must exists if we are here */
+ /* no high-priority if server lock is needed */
+ if (tsi->tsi_ost_body->oa.o_valid & OBD_MD_FLFLAGS &&
+ tsi->tsi_ost_body->oa.o_flags & OBD_FL_SRVLOCK)
+ return;
+ tgt_ses_req(tsi)->rq_ops = &ofd_hpreq_punch;
+}
+
#define OBD_FAIL_OST_READ_NET OBD_FAIL_OST_BRW_NET
#define OBD_FAIL_OST_WRITE_NET OBD_FAIL_OST_BRW_NET
#define OST_BRW_READ OST_READ
TGT_OST_HDL(0 | HABEO_REFERO | MUTABOR,
OST_DESTROY, ofd_destroy_hdl),
TGT_OST_HDL(0 | HABEO_REFERO, OST_STATFS, ofd_statfs_hdl),
-TGT_OST_HDL(HABEO_CORPUS| HABEO_REFERO, OST_BRW_READ, tgt_brw_read),
+TGT_OST_HDL_HP(HABEO_CORPUS| HABEO_REFERO,
+ OST_BRW_READ, tgt_brw_read,
+ ofd_hp_brw),
/* don't set CORPUS flag for brw_write because -ENOENT may be valid case */
-TGT_OST_HDL(MUTABOR, OST_BRW_WRITE, tgt_brw_write),
-TGT_OST_HDL(HABEO_CORPUS| HABEO_REFERO | MUTABOR,
- OST_PUNCH, ofd_punch_hdl),
+TGT_OST_HDL_HP(HABEO_CORPUS| MUTABOR, OST_BRW_WRITE, tgt_brw_write,
+ ofd_hp_brw),
+TGT_OST_HDL_HP(HABEO_CORPUS| HABEO_REFERO | MUTABOR,
+ OST_PUNCH, ofd_punch_hdl,
+ ofd_hp_punch),
TGT_OST_HDL(HABEO_CORPUS| HABEO_REFERO, OST_SYNC, ofd_sync_hdl),
TGT_OST_HDL(0 | HABEO_REFERO, OST_QUOTACTL, ofd_quotactl),
};
CFS_MODULE_PARM(oss_io_cpts, "s", charp, 0444,
"CPU partitions OSS IO threads should run on");
-/**
- * Validate oa from client.
- * If the request comes from 2.0 clients, currently only RSVD seq and IDIF
- * req are valid.
- * a. objects in Single MDT FS seq = FID_SEQ_OST_MDT0, oi_id != 0
- * b. Echo objects(seq = 2), old echo client still use oi_id/oi_seq to
- * pack ost_id. Because non-zero oi_seq will make it diffcult to tell
- * whether this is oi_fid or real ostid. So it will check
- * OBD_CONNECT_FID, then convert the ostid to FID for old client.
- * c. Old FID-disable osc will send IDIF.
- * d. new FID-enable osc/osp will send normal FID.
- *
- * And also oi_id/f_oid should always start from 1. oi_id/f_oid = 0 will
- * be used for LAST_ID file, and only being accessed inside OST now.
- */
-static int ost_validate_obdo(struct obd_export *exp, struct obdo *oa,
- struct obd_ioobj *ioobj)
-{
- int rc = 0;
-
- if (unlikely(!(exp_connect_flags(exp) & OBD_CONNECT_FID) &&
- fid_seq_is_echo(oa->o_oi.oi.oi_seq) && oa != NULL)) {
- /* Sigh 2.[123] client still sends echo req with oi_id = 0
- * during create, and we will reset this to 1, since this
- * oi_id is basically useless in the following create process,
- * but oi_id == 0 will make it difficult to tell whether it is
- * real FID or ost_id. */
- oa->o_oi.oi_fid.f_oid = oa->o_oi.oi.oi_id ?: 1;
- oa->o_oi.oi_fid.f_seq = FID_SEQ_ECHO;
- oa->o_oi.oi_fid.f_ver = 0;
- } else {
- if (unlikely((oa == NULL) || ostid_id(&oa->o_oi) == 0))
- GOTO(out, rc = -EPROTO);
-
- /* Note: this check might be forced in 2.5 or 2.6, i.e.
- * all of the requests are required to setup FLGROUP */
- if (unlikely(!(oa->o_valid & OBD_MD_FLGROUP))) {
- ostid_set_seq_mdt0(&oa->o_oi);
- if (ioobj)
- ostid_set_seq_mdt0(&ioobj->ioo_oid);
- oa->o_valid |= OBD_MD_FLGROUP;
- }
-
- if (unlikely(!(fid_seq_is_idif(ostid_seq(&oa->o_oi)) ||
- fid_seq_is_mdt0(ostid_seq(&oa->o_oi)) ||
- fid_seq_is_norm(ostid_seq(&oa->o_oi)) ||
- fid_seq_is_echo(ostid_seq(&oa->o_oi)))))
- GOTO(out, rc = -EPROTO);
- }
-
- if (ioobj != NULL) {
- unsigned max_brw = ioobj_max_brw_get(ioobj);
-
- if (unlikely((max_brw & (max_brw - 1)) != 0)) {
- CERROR("%s: client %s sent bad ioobj max %u for "DOSTID
- ": rc = -EPROTO\n", exp->exp_obd->obd_name,
- obd_export_nid2str(exp), max_brw,
- POSTID(&oa->o_oi));
- GOTO(out, rc = -EPROTO);
- }
- ioobj->ioo_oid = oa->o_oi;
- }
-
-out:
- if (rc != 0)
- CERROR("%s: client %s sent bad object "DOSTID": rc = %d\n",
- exp->exp_obd->obd_name, obd_export_nid2str(exp),
- oa ? ostid_seq(&oa->o_oi) : -1,
- oa ? ostid_id(&oa->o_oi) : -1, rc);
- return rc;
-}
-
-struct ost_prolong_data {
- struct ptlrpc_request *opd_req;
- struct obd_export *opd_exp;
- struct obdo *opd_oa;
- struct ldlm_res_id opd_resid;
- struct ldlm_extent opd_extent;
- ldlm_mode_t opd_mode;
- unsigned int opd_locks;
- int opd_timeout;
-};
-
-/* prolong locks for the current service time of the corresponding
- * portal (= OST_IO_PORTAL)
- */
-static inline int prolong_timeout(struct ptlrpc_request *req)
-{
- struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
-
- if (AT_OFF)
- return obd_timeout / 2;
-
- return max(at_est2timeout(at_get(&svcpt->scp_at_estimate)),
- ldlm_timeout);
-}
-
-static void ost_prolong_lock_one(struct ost_prolong_data *opd,
- struct ldlm_lock *lock)
-{
- LASSERT(lock->l_export == opd->opd_exp);
-
- if (lock->l_flags & LDLM_FL_DESTROYED) /* lock already cancelled */
- return;
-
- /* XXX: never try to grab resource lock here because we're inside
- * exp_bl_list_lock; in ldlm_lockd.c to handle waiting list we take
- * res lock and then exp_bl_list_lock. */
-
- if (!(lock->l_flags & LDLM_FL_AST_SENT))
- /* ignore locks not being cancelled */
- return;
-
- LDLM_DEBUG(lock,
- "refreshed for req x"LPU64" ext("LPU64"->"LPU64") to %ds.\n",
- opd->opd_req->rq_xid, opd->opd_extent.start,
- opd->opd_extent.end, opd->opd_timeout);
-
- /* OK. this is a possible lock the user holds doing I/O
- * let's refresh eviction timer for it */
- ldlm_refresh_waiting_lock(lock, opd->opd_timeout);
- ++opd->opd_locks;
-}
-
-static void ost_prolong_locks(struct ost_prolong_data *data)
-{
- struct obd_export *exp = data->opd_exp;
- struct obdo *oa = data->opd_oa;
- struct ldlm_lock *lock;
- ENTRY;
-
- if (oa->o_valid & OBD_MD_FLHANDLE) {
- /* mostly a request should be covered by only one lock, try
- * fast path. */
- lock = ldlm_handle2lock(&oa->o_handle);
- if (lock != NULL) {
- /* Fast path to check if the lock covers the whole IO
- * region exclusively. */
- if (lock->l_granted_mode == LCK_PW &&
- ldlm_extent_contain(&lock->l_policy_data.l_extent,
- &data->opd_extent)) {
- /* bingo */
- ost_prolong_lock_one(data, lock);
- LDLM_LOCK_PUT(lock);
- RETURN_EXIT;
- }
- LDLM_LOCK_PUT(lock);
- }
- }
-
-
- spin_lock_bh(&exp->exp_bl_list_lock);
- cfs_list_for_each_entry(lock, &exp->exp_bl_list, l_exp_list) {
- LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
- LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
-
- if (!ldlm_res_eq(&data->opd_resid, &lock->l_resource->lr_name))
- continue;
-
- if (!ldlm_extent_overlap(&lock->l_policy_data.l_extent,
- &data->opd_extent))
- continue;
-
- ost_prolong_lock_one(data, lock);
- }
- spin_unlock_bh(&exp->exp_bl_list_lock);
-
- EXIT;
-}
-
-/**
- * Returns 1 if the given PTLRPC matches the given LDLM locks, or 0 if it does
- * not.
- */
-static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
- struct ldlm_lock *lock)
-{
- struct niobuf_remote *nb;
- struct obd_ioobj *ioo;
- int mode, opc;
- struct ldlm_extent ext;
- ENTRY;
-
- opc = lustre_msg_get_opc(req->rq_reqmsg);
- LASSERT(opc == OST_READ || opc == OST_WRITE);
-
- ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
- LASSERT(ioo != NULL);
-
- nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
- LASSERT(nb != NULL);
-
- ext.start = nb->offset;
- nb += ioo->ioo_bufcnt - 1;
- ext.end = nb->offset + nb->len - 1;
-
- LASSERT(lock->l_resource != NULL);
- if (!ostid_res_name_eq(&ioo->ioo_oid, &lock->l_resource->lr_name))
- RETURN(0);
-
- mode = LCK_PW;
- if (opc == OST_READ)
- mode |= LCK_PR;
- if (!(lock->l_granted_mode & mode))
- RETURN(0);
-
- RETURN(ldlm_extent_overlap(&lock->l_policy_data.l_extent, &ext));
-}
-
-/**
- * High-priority queue request check for whether the given PTLRPC request (\a
- * req) is blocking an LDLM lock cancel.
- *
- * Returns 1 if the given given PTLRPC request (\a req) is blocking an LDLM lock
- * cancel, 0 if it is not, and -EFAULT if the request is malformed.
- *
- * Only OST_READs, OST_WRITEs and OST_PUNCHes go on the h-p RPC queue. This
- * function looks only at OST_READs and OST_WRITEs.
- */
-static int ost_rw_hpreq_check(struct ptlrpc_request *req)
-{
- struct obd_device *obd = req->rq_export->exp_obd;
- struct ost_body *body;
- struct obd_ioobj *ioo;
- struct niobuf_remote *nb;
- struct ost_prolong_data opd = { 0 };
- int mode, opc;
- ENTRY;
-
- /*
- * Use LASSERT to do sanity check because malformed RPCs should have
- * been filtered out in ost_hpreq_handler().
- */
- opc = lustre_msg_get_opc(req->rq_reqmsg);
- LASSERT(opc == OST_READ || opc == OST_WRITE);
-
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- LASSERT(body != NULL);
-
- ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
- LASSERT(ioo != NULL);
-
- nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
- LASSERT(nb != NULL);
- LASSERT(!(nb->flags & OBD_BRW_SRVLOCK));
-
- ostid_build_res_name(&ioo->ioo_oid, &opd.opd_resid);
-
- opd.opd_req = req;
- mode = LCK_PW;
- if (opc == OST_READ)
- mode |= LCK_PR;
- opd.opd_mode = mode;
- opd.opd_exp = req->rq_export;
- opd.opd_oa = &body->oa;
- opd.opd_extent.start = nb->offset;
- nb += ioo->ioo_bufcnt - 1;
- opd.opd_extent.end = nb->offset + nb->len - 1;
- opd.opd_timeout = prolong_timeout(req);
-
- DEBUG_REQ(D_RPCTRACE, req,
- "%s %s: refresh rw locks: " LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
- obd->obd_name, current->comm,
- opd.opd_resid.name[0], opd.opd_resid.name[1],
- opd.opd_extent.start, opd.opd_extent.end);
-
- ost_prolong_locks(&opd);
-
- CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
- obd->obd_name, opd.opd_locks, req);
-
- RETURN(opd.opd_locks > 0);
-}
-
-static void ost_rw_hpreq_fini(struct ptlrpc_request *req)
-{
- (void)ost_rw_hpreq_check(req);
-}
-
-/**
- * Like ost_rw_hpreq_lock_match(), but for OST_PUNCH RPCs.
- */
-static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
- struct ldlm_lock *lock)
-{
- struct ost_body *body;
- ENTRY;
-
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- LASSERT(body != NULL);
-
- if (body->oa.o_valid & OBD_MD_FLHANDLE &&
- body->oa.o_handle.cookie == lock->l_handle.h_cookie)
- RETURN(1);
-
- RETURN(0);
-}
-
-/**
- * Like ost_rw_hpreq_check(), but for OST_PUNCH RPCs.
- */
-static int ost_punch_hpreq_check(struct ptlrpc_request *req)
-{
- struct obd_device *obd = req->rq_export->exp_obd;
- struct ost_body *body;
- struct obdo *oa;
- struct ost_prolong_data opd = { 0 };
- __u64 start, end;
- ENTRY;
-
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- LASSERT(body != NULL);
-
- oa = &body->oa;
- LASSERT(!(oa->o_valid & OBD_MD_FLFLAGS) ||
- !(oa->o_flags & OBD_FL_SRVLOCK));
-
- start = oa->o_size;
- end = start + oa->o_blocks;
-
- opd.opd_req = req;
- opd.opd_mode = LCK_PW;
- opd.opd_exp = req->rq_export;
- opd.opd_oa = oa;
- opd.opd_extent.start = start;
- opd.opd_extent.end = end;
- if (oa->o_blocks == OBD_OBJECT_EOF)
- opd.opd_extent.end = OBD_OBJECT_EOF;
- opd.opd_timeout = prolong_timeout(req);
-
- ostid_build_res_name(&oa->o_oi, &opd.opd_resid);
-
- CDEBUG(D_DLMTRACE,
- "%s: refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
- obd->obd_name,
- opd.opd_resid.name[0], opd.opd_resid.name[1],
- opd.opd_extent.start, opd.opd_extent.end);
-
- ost_prolong_locks(&opd);
-
- CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
- obd->obd_name, opd.opd_locks, req);
-
- RETURN(opd.opd_locks > 0);
-}
-
-static void ost_punch_hpreq_fini(struct ptlrpc_request *req)
-{
- (void)ost_punch_hpreq_check(req);
-}
-
-struct ptlrpc_hpreq_ops ost_hpreq_rw = {
- .hpreq_lock_match = ost_rw_hpreq_lock_match,
- .hpreq_check = ost_rw_hpreq_check,
- .hpreq_fini = ost_rw_hpreq_fini
-};
-
-struct ptlrpc_hpreq_ops ost_hpreq_punch = {
- .hpreq_lock_match = ost_punch_hpreq_lock_match,
- .hpreq_check = ost_punch_hpreq_check,
- .hpreq_fini = ost_punch_hpreq_fini
-};
-
-/** Assign high priority operations to the request if needed. */
-static int ost_io_hpreq_handler(struct ptlrpc_request *req)
-{
- ENTRY;
- if (req->rq_export) {
- int opc = lustre_msg_get_opc(req->rq_reqmsg);
- struct ost_body *body;
-
- if (opc == OST_READ || opc == OST_WRITE) {
- struct niobuf_remote *nb;
- struct obd_ioobj *ioo;
- int objcount, niocount;
- int rc;
- int i;
-
- /* RPCs on the H-P queue can be inspected before
- * ost_handler() initializes their pills, so we
- * initialize that here. Capsule initialization is
- * idempotent, as is setting the pill's format (provided
- * it doesn't change).
- */
- req_capsule_init(&req->rq_pill, req, RCL_SERVER);
- if (opc == OST_READ)
- req_capsule_set(&req->rq_pill,
- &RQF_OST_BRW_READ);
- else
- req_capsule_set(&req->rq_pill,
- &RQF_OST_BRW_WRITE);
-
- body = req_capsule_client_get(&req->rq_pill,
- &RMF_OST_BODY);
- if (body == NULL) {
- CERROR("Missing/short ost_body\n");
- RETURN(-EFAULT);
- }
-
- objcount = req_capsule_get_size(&req->rq_pill,
- &RMF_OBD_IOOBJ,
- RCL_CLIENT) /
- sizeof(*ioo);
- if (objcount == 0) {
- CERROR("Missing/short ioobj\n");
- RETURN(-EFAULT);
- }
- if (objcount > 1) {
- CERROR("too many ioobjs (%d)\n", objcount);
- RETURN(-EFAULT);
- }
-
- ioo = req_capsule_client_get(&req->rq_pill,
- &RMF_OBD_IOOBJ);
- if (ioo == NULL) {
- CERROR("Missing/short ioobj\n");
- RETURN(-EFAULT);
- }
-
- rc = ost_validate_obdo(req->rq_export, &body->oa, ioo);
- if (rc) {
- CERROR("invalid object ids\n");
- RETURN(rc);
- }
-
- for (niocount = i = 0; i < objcount; i++) {
- if (ioo[i].ioo_bufcnt == 0) {
- CERROR("ioo[%d] has zero bufcnt\n", i);
- RETURN(-EFAULT);
- }
- niocount += ioo[i].ioo_bufcnt;
- }
- if (niocount > PTLRPC_MAX_BRW_PAGES) {
- DEBUG_REQ(D_RPCTRACE, req,
- "bulk has too many pages (%d)",
- niocount);
- RETURN(-EFAULT);
- }
-
- nb = req_capsule_client_get(&req->rq_pill,
- &RMF_NIOBUF_REMOTE);
- if (nb == NULL) {
- CERROR("Missing/short niobuf\n");
- RETURN(-EFAULT);
- }
-
- if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
- req->rq_ops = &ost_hpreq_rw;
- } else if (opc == OST_PUNCH) {
- req_capsule_init(&req->rq_pill, req, RCL_SERVER);
- req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
-
- body = req_capsule_client_get(&req->rq_pill,
- &RMF_OST_BODY);
- if (body == NULL) {
- CERROR("Missing/short ost_body\n");
- RETURN(-EFAULT);
- }
-
- if (!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
- !(body->oa.o_flags & OBD_FL_SRVLOCK))
- req->rq_ops = &ost_hpreq_punch;
- }
- }
- RETURN(0);
-}
-
#define OST_WATCHDOG_TIMEOUT (obd_timeout * 1000)
static struct cfs_cpt_table *ost_io_cptable;
.so_thr_init = tgt_io_thread_init,
.so_thr_done = tgt_io_thread_done,
.so_req_handler = tgt_request_handle,
- .so_hpreq_handler = ost_io_hpreq_handler,
+ .so_hpreq_handler = tgt_hpreq_handler,
.so_req_printer = target_print_req,
},
};
return (rc == -ENOTCONN || rc == -ENODEV);
}
-#ifdef HAVE_SERVER_SUPPORT
+#if defined HAVE_SERVER_SUPPORT && defined(__KERNEL__)
int tgt_mod_init(void);
void tgt_mod_exit(void);
#else
__init int ptlrpc_init(void)
{
- int rc, cleanup_phase = 0;
- ENTRY;
+ int rc;
- lustre_assert_wire_constants();
+ ENTRY;
+
+ lustre_assert_wire_constants();
#if RS_DEBUG
spin_lock_init(&ptlrpc_rs_debug_lock);
#endif
mutex_init(&ptlrpc_all_services_mutex);
mutex_init(&pinger_mutex);
mutex_init(&ptlrpcd_mutex);
- ptlrpc_init_xid();
+ ptlrpc_init_xid();
rc = req_layout_init();
if (rc)
RETURN(rc);
+ rc = tgt_mod_init();
+ if (rc)
+ GOTO(err_layout, rc);
+
rc = ptlrpc_hr_init();
if (rc)
- RETURN(rc);
+ GOTO(err_tgt, rc);
- cleanup_phase = 1;
rc = ptlrpc_request_cache_init();
if (rc)
- GOTO(cleanup, rc);
+ GOTO(err_hr, rc);
- cleanup_phase = 2;
rc = ptlrpc_init_portals();
if (rc)
- GOTO(cleanup, rc);
-
- cleanup_phase = 3;
+ GOTO(err_cache, rc);
rc = ptlrpc_connection_init();
if (rc)
- GOTO(cleanup, rc);
+ GOTO(err_portals, rc);
- cleanup_phase = 4;
ptlrpc_put_connection_superhack = ptlrpc_connection_put;
rc = ptlrpc_start_pinger();
if (rc)
- GOTO(cleanup, rc);
+ GOTO(err_conn, rc);
- cleanup_phase = 5;
rc = ldlm_init();
if (rc)
- GOTO(cleanup, rc);
+ GOTO(err_pinger, rc);
- cleanup_phase = 6;
rc = sptlrpc_init();
if (rc)
- GOTO(cleanup, rc);
+ GOTO(err_ldlm, rc);
- cleanup_phase = 7;
rc = ptlrpc_nrs_init();
if (rc)
- GOTO(cleanup, rc);
-
-#ifdef __KERNEL__
- cleanup_phase = 8;
- rc = tgt_mod_init();
- if (rc)
- GOTO(cleanup, rc);
-#endif
- RETURN(0);
-
-cleanup:
- switch(cleanup_phase) {
-#ifdef __KERNEL__
- case 8:
- ptlrpc_nrs_fini();
-#endif
- case 7:
- sptlrpc_fini();
- case 6:
- ldlm_exit();
- case 5:
- ptlrpc_stop_pinger();
- case 4:
- ptlrpc_connection_fini();
- case 3:
- ptlrpc_exit_portals();
- case 2:
- ptlrpc_request_cache_fini();
- case 1:
- ptlrpc_hr_fini();
- req_layout_fini();
- default: ;
- }
-
- return rc;
+ GOTO(err_sptlrpc, rc);
+
+ RETURN(0);
+err_sptlrpc:
+ sptlrpc_fini();
+err_ldlm:
+ ldlm_exit();
+err_pinger:
+ ptlrpc_stop_pinger();
+err_conn:
+ ptlrpc_connection_fini();
+err_portals:
+ ptlrpc_exit_portals();
+err_cache:
+ ptlrpc_request_cache_fini();
+err_hr:
+ ptlrpc_hr_fini();
+err_tgt:
+ tgt_mod_exit();
+err_layout:
+ req_layout_fini();
+ return rc;
}
#ifdef __KERNEL__
static void __exit ptlrpc_exit(void)
{
- tgt_mod_exit();
ptlrpc_nrs_fini();
- sptlrpc_fini();
- ldlm_exit();
- ptlrpc_stop_pinger();
- ptlrpc_exit_portals();
+ sptlrpc_fini();
+ ldlm_exit();
+ ptlrpc_stop_pinger();
+ ptlrpc_exit_portals();
ptlrpc_request_cache_fini();
- ptlrpc_hr_fini();
- ptlrpc_connection_fini();
+ ptlrpc_hr_fini();
+ ptlrpc_connection_fini();
+ tgt_mod_exit();
+ req_layout_fini();
}
MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
struct ptlrpc_service_part *svcpt = rqbd->rqbd_svcpt;
struct ptlrpc_service *svc = svcpt->scp_service;
- int refcount;
- cfs_list_t *tmp;
- cfs_list_t *nxt;
+ int refcount;
+ cfs_list_t *tmp;
+ cfs_list_t *nxt;
- if (!cfs_atomic_dec_and_test(&req->rq_refcount))
- return;
+ if (!cfs_atomic_dec_and_test(&req->rq_refcount))
+ return;
+
+ if (req->rq_session.lc_state == LCS_ENTERED) {
+ lu_context_exit(&req->rq_session);
+ lu_context_fini(&req->rq_session);
+ }
if (req->rq_at_linked) {
spin_lock(&svcpt->scp_at_lock);
{
ptlrpc_server_hpreq_fini(req);
- if (req->rq_session.lc_thread != NULL) {
- lu_context_exit(&req->rq_session);
- lu_context_fini(&req->rq_session);
- }
-
ptlrpc_server_drop_request(req);
}
if (rc < 0)
RETURN(rc);
+ /* the current thread is not the processing thread for this request
+ * since that, but request is in exp_hp_list and can be find there.
+ * Remove all relations between request and old thread. */
+ req->rq_svc_thread->t_env->le_ses = NULL;
+ req->rq_svc_thread = NULL;
+ req->rq_session.lc_thread = NULL;
+
ptlrpc_nrs_req_add(svcpt, req, !!rc);
RETURN(0);
}
req->rq_session.lc_thread = thread;
lu_context_enter(&req->rq_session);
- req->rq_svc_thread->t_env->le_ses = &req->rq_session;
+ thread->t_env->le_ses = &req->rq_session;
}
ptlrpc_at_add_timed(req);
/* re-assign request and sesson thread to the current one */
request->rq_svc_thread = thread;
if (thread != NULL) {
- LASSERT(request->rq_session.lc_thread != NULL);
+ LASSERT(request->rq_session.lc_thread == NULL);
request->rq_session.lc_thread = thread;
- request->rq_session.lc_cookie = 0x55;
thread->t_env->le_ses = &request->rq_session;
}
svc->srv_ops.so_req_handler(request);
ptlrpc_start_thread(svcpt, 0);
}
+ /* reset le_ses to initial state */
+ env->le_ses = NULL;
/* Process all incoming reqs before handling any */
if (ptlrpc_server_request_incoming(svcpt)) {
lu_context_enter(&env->le_ctx);
- env->le_ses = NULL;
ptlrpc_server_handle_req_in(svcpt, thread);
lu_context_exit(&env->le_ctx);
} else {
rc = PTR_ERR(obj);
}
+
+ tsi->tsi_fid = body->fid1;
+
RETURN(rc);
}
}
EXPORT_SYMBOL(tgt_validate_obdo);
+static int tgt_io_data_unpack(struct tgt_session_info *tsi, struct ost_id *oi)
+{
+ unsigned max_brw;
+ struct niobuf_remote *rnb;
+ struct obd_ioobj *ioo;
+ int obj_count;
+
+ ENTRY;
+
+ ioo = req_capsule_client_get(tsi->tsi_pill, &RMF_OBD_IOOBJ);
+ if (ioo == NULL)
+ RETURN(-EPROTO);
+
+ rnb = req_capsule_client_get(tsi->tsi_pill, &RMF_NIOBUF_REMOTE);
+ if (rnb == NULL)
+ RETURN(-EPROTO);
+
+ max_brw = ioobj_max_brw_get(ioo);
+ if (unlikely((max_brw & (max_brw - 1)) != 0)) {
+ CERROR("%s: client %s sent bad ioobj max %u for "DOSTID
+ ": rc = %d\n", tgt_name(tsi->tsi_tgt),
+ obd_export_nid2str(tsi->tsi_exp), max_brw,
+ POSTID(oi), -EPROTO);
+ RETURN(-EPROTO);
+ }
+ ioo->ioo_oid = *oi;
+
+ obj_count = req_capsule_get_size(tsi->tsi_pill, &RMF_OBD_IOOBJ,
+ RCL_CLIENT) / sizeof(*ioo);
+ if (obj_count == 0) {
+ CERROR("%s: short ioobj\n", tgt_name(tsi->tsi_tgt));
+ RETURN(-EPROTO);
+ } else if (obj_count > 1) {
+ CERROR("%s: too many ioobjs (%d)\n", tgt_name(tsi->tsi_tgt),
+ obj_count);
+ RETURN(-EPROTO);
+ }
+
+ if (ioo->ioo_bufcnt == 0) {
+ CERROR("%s: ioo has zero bufcnt\n", tgt_name(tsi->tsi_tgt));
+ RETURN(-EPROTO);
+ }
+
+ if (ioo->ioo_bufcnt > PTLRPC_MAX_BRW_PAGES) {
+ DEBUG_REQ(D_RPCTRACE, tgt_ses_req(tsi),
+ "bulk has too many pages (%d)",
+ ioo->ioo_bufcnt);
+ RETURN(-EPROTO);
+ }
+
+ RETURN(0);
+}
+
static int tgt_ost_body_unpack(struct tgt_session_info *tsi, __u32 flags)
{
struct ost_body *body;
struct req_capsule *pill = tsi->tsi_pill;
struct lustre_capa *capa;
- struct obd_ioobj *ioo;
int rc;
ENTRY;
RETURN(rc);
if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
- capa = req_capsule_client_get(tsi->tsi_pill, &RMF_CAPA1);
+ capa = req_capsule_client_get(pill, &RMF_CAPA1);
if (capa == NULL) {
CERROR("%s: OSSCAPA flag is set without capability\n",
tgt_name(tsi->tsi_tgt));
tsi->tsi_ost_body = body;
if (req_capsule_has_field(pill, &RMF_OBD_IOOBJ, RCL_CLIENT)) {
- unsigned max_brw;
- struct niobuf_remote *rnb;
-
- ioo = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
- if (ioo == NULL)
- RETURN(-EPROTO);
-
- rnb = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
- if (rnb == NULL)
- RETURN(-EPROTO);
-
- max_brw = ioobj_max_brw_get(ioo);
- if (unlikely((max_brw & (max_brw - 1)) != 0)) {
- CERROR("%s: client %s sent bad ioobj max %u for "DOSTID
- ": rc = %d\n", tgt_name(tsi->tsi_tgt),
- obd_export_nid2str(tsi->tsi_exp), max_brw,
- POSTID(&body->oa.o_oi), -EPROTO);
- RETURN(-EPROTO);
- }
- ioo->ioo_oid = body->oa.o_oi;
+ rc = tgt_io_data_unpack(tsi, &body->oa.o_oi);
+ if (rc < 0)
+ RETURN(rc);
}
if (!(body->oa.o_valid & OBD_MD_FLID)) {
RETURN(rc);
}
-static int tgt_unpack_req_pack_rep(struct tgt_session_info *tsi, __u32 flags)
+/*
+ * Do necessary preprocessing according to handler ->th_flags.
+ */
+static int tgt_request_preprocess(struct tgt_session_info *tsi,
+ struct tgt_handler *h,
+ struct ptlrpc_request *req)
{
struct req_capsule *pill = tsi->tsi_pill;
- int rc;
+ __u32 flags = h->th_flags;
+ int rc = 0;
ENTRY;
- if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT)) {
- rc = tgt_mdt_body_unpack(tsi, flags);
- } else if (req_capsule_has_field(pill, &RMF_OST_BODY, RCL_CLIENT)) {
- rc = tgt_ost_body_unpack(tsi, flags);
- } else {
- rc = 0;
+ if (tsi->tsi_preprocessed)
+ RETURN(0);
+
+ LASSERT(h->th_act != NULL);
+ LASSERT(h->th_opc == lustre_msg_get_opc(req->rq_reqmsg));
+ LASSERT(current->journal_info == NULL);
+
+ LASSERT(ergo(flags & (HABEO_CORPUS | HABEO_REFERO),
+ h->th_fmt != NULL));
+ if (h->th_fmt != NULL) {
+ req_capsule_set(pill, h->th_fmt);
+ if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT)) {
+ rc = tgt_mdt_body_unpack(tsi, flags);
+ if (rc < 0)
+ RETURN(rc);
+ } else if (req_capsule_has_field(pill, &RMF_OST_BODY,
+ RCL_CLIENT)) {
+ rc = tgt_ost_body_unpack(tsi, flags);
+ if (rc < 0)
+ RETURN(rc);
+ }
}
- if (rc == 0 && flags & HABEO_REFERO) {
- /* Pack reply */
- if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
- req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
- tsi->tsi_mdt_body->eadatasize);
- if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
- req_capsule_set_size(pill, &RMF_LOGCOOKIES,
- RCL_SERVER, 0);
+ if (flags & MUTABOR && tgt_conn_flags(tsi) & OBD_CONNECT_RDONLY)
+ RETURN(-EROFS);
+
+ if (flags & HABEO_CLAVIS) {
+ struct ldlm_request *dlm_req;
- rc = req_capsule_server_pack(pill);
+ LASSERT(h->th_fmt != NULL);
+
+ dlm_req = req_capsule_client_get(pill, &RMF_DLM_REQ);
+ if (dlm_req != NULL) {
+ if (unlikely(dlm_req->lock_desc.l_resource.lr_type ==
+ LDLM_IBITS &&
+ dlm_req->lock_desc.l_policy_data.\
+ l_inodebits.bits == 0)) {
+ /*
+ * Lock without inodebits makes no sense and
+ * will oops later in ldlm. If client miss to
+ * set such bits, do not trigger ASSERTION.
+ *
+ * For liblustre flock case, it maybe zero.
+ */
+ rc = -EPROTO;
+ } else {
+ tsi->tsi_dlm_req = dlm_req;
+ }
+ } else {
+ rc = -EFAULT;
+ }
}
+ tsi->tsi_preprocessed = 1;
RETURN(rc);
}
{
int serious = 0;
int rc;
- __u32 flags;
ENTRY;
- LASSERT(h->th_act != NULL);
- LASSERT(h->th_opc == lustre_msg_get_opc(req->rq_reqmsg));
- LASSERT(current->journal_info == NULL);
-
/*
* Checking for various OBD_FAIL_$PREF_$OPC_NET codes. _Do_ not try
* to put same checks into handlers like mdt_close(), mdt_reint(),
if (OBD_FAIL_CHECK_ORSET(h->th_fail_id, OBD_FAIL_ONCE))
RETURN(0);
- rc = 0;
- flags = h->th_flags;
- LASSERT(ergo(flags & (HABEO_CORPUS | HABEO_REFERO),
- h->th_fmt != NULL));
- if (h->th_fmt != NULL) {
- req_capsule_set(tsi->tsi_pill, h->th_fmt);
- rc = tgt_unpack_req_pack_rep(tsi, flags);
- }
-
- if (rc == 0 && flags & MUTABOR &&
- tgt_conn_flags(tsi) & OBD_CONNECT_RDONLY)
- rc = -EROFS;
-
- if (rc == 0 && flags & HABEO_CLAVIS) {
- struct ldlm_request *dlm_req;
-
- LASSERT(h->th_fmt != NULL);
+ rc = tgt_request_preprocess(tsi, h, req);
+ /* pack reply if reply format is fixed */
+ if (rc == 0 && h->th_flags & HABEO_REFERO) {
+ /* Pack reply */
+ if (req_capsule_has_field(tsi->tsi_pill, &RMF_MDT_MD,
+ RCL_SERVER))
+ req_capsule_set_size(tsi->tsi_pill, &RMF_MDT_MD,
+ RCL_SERVER,
+ tsi->tsi_mdt_body->eadatasize);
+ if (req_capsule_has_field(tsi->tsi_pill, &RMF_LOGCOOKIES,
+ RCL_SERVER))
+ req_capsule_set_size(tsi->tsi_pill, &RMF_LOGCOOKIES,
+ RCL_SERVER, 0);
- dlm_req = req_capsule_client_get(tsi->tsi_pill, &RMF_DLM_REQ);
- if (dlm_req != NULL) {
- if (unlikely(dlm_req->lock_desc.l_resource.lr_type ==
- LDLM_IBITS &&
- dlm_req->lock_desc.l_policy_data.\
- l_inodebits.bits == 0)) {
- /*
- * Lock without inodebits makes no sense and
- * will oops later in ldlm. If client miss to
- * set such bits, do not trigger ASSERTION.
- *
- * For liblustre flock case, it maybe zero.
- */
- rc = -EPROTO;
- } else {
- tsi->tsi_dlm_req = dlm_req;
- }
- } else {
- rc = -EFAULT;
- }
+ rc = req_capsule_server_pack(tsi->tsi_pill);
}
if (likely(rc == 0)) {
RETURN(+1);
}
+/* Initial check for request, it is validation mostly */
+static struct tgt_handler *tgt_handler_find_check(struct ptlrpc_request *req)
+{
+ struct tgt_handler *h;
+ struct tgt_opc_slice *s;
+ struct lu_target *tgt;
+ __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
+
+ ENTRY;
+
+ tgt = class_exp2tgt(req->rq_export);
+
+ for (s = tgt->lut_slice; s->tos_hs != NULL; s++)
+ if (s->tos_opc_start <= opc && opc < s->tos_opc_end)
+ break;
+
+ /* opcode was not found in slice */
+ if (unlikely(s->tos_hs == NULL)) {
+ CERROR("%s: no handlers for opcode 0x%x\n", tgt_name(tgt),
+ opc);
+ RETURN(ERR_PTR(-ENOTSUPP));
+ }
+
+ LASSERT(opc >= s->tos_opc_start && opc < s->tos_opc_end);
+ h = s->tos_hs + (opc - s->tos_opc_start);
+ if (unlikely(h->th_opc == 0)) {
+ CERROR("%s: unsupported opcode 0x%x\n", tgt_name(tgt), opc);
+ RETURN(ERR_PTR(-ENOTSUPP));
+ }
+
+ RETURN(h);
+}
+
int tgt_request_handle(struct ptlrpc_request *req)
{
struct tgt_session_info *tsi = tgt_ses_info(req->rq_svc_thread->t_env);
struct lustre_msg *msg = req->rq_reqmsg;
struct tgt_handler *h;
- struct tgt_opc_slice *s;
struct lu_target *tgt;
int request_fail_id = 0;
__u32 opc = lustre_msg_get_opc(msg);
request_fail_id = tgt->lut_request_fail_id;
tsi->tsi_reply_fail_id = tgt->lut_reply_fail_id;
- for (s = tgt->lut_slice; s->tos_hs != NULL; s++)
- if (s->tos_opc_start <= opc && opc < s->tos_opc_end)
- break;
-
- /* opcode was not found in slice */
- if (unlikely(s->tos_hs == NULL)) {
- CERROR("%s: no handlers for opcode 0x%x\n", tgt_name(tgt), opc);
- req->rq_status = -ENOTSUPP;
+ h = tgt_handler_find_check(req);
+ if (IS_ERR(h)) {
+ req->rq_status = PTR_ERR(h);
rc = ptlrpc_error(req);
GOTO(out, rc);
}
if (CFS_FAIL_CHECK_ORSET(request_fail_id, CFS_FAIL_ONCE))
GOTO(out, rc = 0);
- LASSERT(current->journal_info == NULL);
-
- LASSERT(opc >= s->tos_opc_start && opc < s->tos_opc_end);
- h = s->tos_hs + (opc - s->tos_opc_start);
- if (unlikely(h->th_opc == 0)) {
- CERROR("%s: unsupported opcode 0x%x\n", tgt_name(tgt), opc);
- req->rq_status = -ENOTSUPP;
- rc = ptlrpc_error(req);
- GOTO(out, rc);
- }
-
rc = lustre_msg_check_version(msg, h->th_version);
if (unlikely(rc)) {
DEBUG_REQ(D_ERROR, req, "%s: drop mal-formed request, version"
EXIT;
out:
req_capsule_fini(tsi->tsi_pill);
- tsi->tsi_pill = NULL;
if (tsi->tsi_corpus != NULL) {
lu_object_put(tsi->tsi_env, tsi->tsi_corpus);
tsi->tsi_corpus = NULL;
}
- tsi->tsi_env = NULL;
- tsi->tsi_mdt_body = NULL;
- tsi->tsi_dlm_req = NULL;
- fid_zero(&tsi->tsi_fid);
- memset(&tsi->tsi_resid, 0, sizeof tsi->tsi_resid);
return rc;
}
EXPORT_SYMBOL(tgt_request_handle);
+/** Assign high priority operations to the request if needed. */
+int tgt_hpreq_handler(struct ptlrpc_request *req)
+{
+ struct tgt_session_info *tsi = tgt_ses_info(req->rq_svc_thread->t_env);
+ struct tgt_handler *h;
+ int rc;
+
+ ENTRY;
+
+ if (req->rq_export == NULL)
+ RETURN(0);
+
+ req_capsule_init(&req->rq_pill, req, RCL_SERVER);
+ tsi->tsi_pill = &req->rq_pill;
+ tsi->tsi_env = req->rq_svc_thread->t_env;
+ tsi->tsi_tgt = class_exp2tgt(req->rq_export);
+ tsi->tsi_exp = req->rq_export;
+
+ h = tgt_handler_find_check(req);
+ if (IS_ERR(h)) {
+ rc = PTR_ERR(h);
+ RETURN(rc);
+ }
+
+ rc = tgt_request_preprocess(tsi, h, req);
+ if (unlikely(rc != 0))
+ RETURN(rc);
+
+ if (h->th_hp != NULL)
+ h->th_hp(tsi);
+ RETURN(0);
+}
+EXPORT_SYMBOL(tgt_hpreq_handler);
+
void tgt_counter_incr(struct obd_export *exp, int opcode)
{
lprocfs_counter_incr(exp->exp_obd->obd_stats, opcode);
/* Check if there is eviction in progress, and if so, wait for it to
* finish */
- if (unlikely(cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
+ if (unlikely(atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
/* We do not care how long it takes */
lwi = LWI_INTR(NULL, NULL);
rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
- !cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress),
+ !atomic_read(&exp->exp_obd->obd_evict_inprogress),
&lwi);
}