#define LUT_REPLY_SLOTS_PER_CHUNK (1<<20)
#define LUT_REPLY_SLOTS_MAX_CHUNKS 16
+#define TRD_INDEX_MEMORY -1
+
/**
* Target reply data
*/
int sync);
int tgt_reply_data_init(const struct lu_env *env, struct lu_target *tgt);
int tgt_lookup_reply(struct ptlrpc_request *req, struct tg_reply_data *trd);
-int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
- struct tg_export_data *ted, struct tg_reply_data *trd,
- struct ptlrpc_request *req,
- struct thandle *th, bool update_lrd_file);
+int tgt_mk_reply_data(const struct lu_env *env, struct lu_target *tgt,
+ struct tg_export_data *ted, struct ptlrpc_request *req,
+ __u64 opdata, struct thandle *th, bool write_update,
+ __u64 transno);
struct tg_reply_data *tgt_lookup_reply_by_xid(struct tg_export_data *ted,
__u64 xid);
int tgt_tunables_init(struct lu_target *lut);
void *ei_namespace; /** lock namespace **/
u64 ei_inodebits; /** lock inode bits **/
unsigned int ei_enq_slave:1; /** whether enqueue slave stripes */
+ unsigned int ei_enq_slot:1; /** whether acquire rpc slot */
};
#define ei_res_id ei_cb_gl
struct ptlrpc_request;
struct obd_device;
-static inline void mdc_get_mod_rpc_slot(struct ptlrpc_request *req,
- struct lookup_intent *it)
-{
- struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
- __u32 opc;
- __u16 tag;
-
- opc = lustre_msg_get_opc(req->rq_reqmsg);
- tag = obd_get_mod_rpc_slot(cli, opc, it);
- lustre_msg_set_tag(req->rq_reqmsg, tag);
- ptlrpc_reassign_next_xid(req);
-}
-
-static inline void mdc_put_mod_rpc_slot(struct ptlrpc_request *req,
- struct lookup_intent *it)
-{
- struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
- __u32 opc;
- __u16 tag;
-
- opc = lustre_msg_get_opc(req->rq_reqmsg);
- tag = lustre_msg_get_tag(req->rq_reqmsg);
- obd_put_mod_rpc_slot(cli, opc, it, tag);
-}
-
-
/**
* Update the maximum possible easize.
*
__u64 ptlrpc_next_xid(void);
__u64 ptlrpc_sample_next_xid(void);
__u64 ptlrpc_req_xid(struct ptlrpc_request *request);
-void ptlrpc_reassign_next_xid(struct ptlrpc_request *req);
+void ptlrpc_get_mod_rpc_slot(struct ptlrpc_request *req);
+void ptlrpc_put_mod_rpc_slot(struct ptlrpc_request *req);
/* Set of routines to run a function in ptlrpcd context */
void *ptlrpcd_alloc_work(struct obd_import *imp,
int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max);
int obd_mod_rpc_stats_seq_show(struct client_obd *cli, struct seq_file *seq);
-__u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
- struct lookup_intent *it);
-void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
- struct lookup_intent *it, __u16 tag);
+__u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc);
+void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag);
struct llog_handle;
struct llog_rec_hdr;
int local = ns_is_client(ldlm_res_to_ns(res));
enum ldlm_error rc = ELDLM_OK;
struct ldlm_interval *node = NULL;
+#ifdef HAVE_SERVER_SUPPORT
+ bool reconstruct = false;
+#endif
ENTRY;
/* policies are not executed on the client or during replay */
if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT)
OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
+#ifdef HAVE_SERVER_SUPPORT
+ reconstruct = !local && res->lr_type == LDLM_FLOCK &&
+ !(*flags & LDLM_FL_TEST_LOCK);
+ if (reconstruct) {
+ rc = req_can_reconstruct(cookie, NULL);
+ if (rc != 0) {
+ if (rc == 1)
+ rc = 0;
+ RETURN(rc);
+ }
+ }
+#endif
+
lock_res_and_lock(lock);
if (local && ldlm_is_granted(lock)) {
/* The server returned a blocked lock, but it was granted
out:
unlock_res_and_lock(lock);
+
+#ifdef HAVE_SERVER_SUPPORT
+ if (reconstruct) {
+ struct ptlrpc_request *req = cookie;
+
+ tgt_mk_reply_data(NULL, NULL,
+ &req->rq_export->exp_target_data,
+ req, 0, NULL, false, 0);
+ }
+#endif
if (node)
OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
return rc;
lock->l_req_extent = lock->l_policy_data.l_extent;
existing_lock:
- if (flags & LDLM_FL_HAS_INTENT) {
- /*
- * In this case, the reply buffer is allocated deep in
- * local_lock_enqueue by the policy function.
- */
- cookie = req;
- } else {
- /*
- * based on the assumption that lvb size never changes during
+ cookie = req;
+ if (!(flags & LDLM_FL_HAS_INTENT)) {
+ /* based on the assumption that lvb size never changes during
* resource life time otherwise it need resource->lr_lock's
- * protection
- */
+ * protection */
req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
RCL_SERVER, ldlm_lvbo_size(lock));
}
}
+static bool ldlm_request_slot_needed(enum ldlm_type type)
+{
+ return type == LDLM_FLOCK || type == LDLM_IBITS;
+}
+
/**
* Finishing portion of client lock enqueue code.
*
ENTRY;
+ if (ldlm_request_slot_needed(type))
+ obd_put_request_slot(&req->rq_import->imp_obd->u.cli);
+
+ ptlrpc_put_mod_rpc_slot(req);
+
if (req && req->rq_svc_thread)
env = req->rq_svc_thread->t_env;
LDLM_GLIMPSE_ENQUEUE);
}
+ /* It is important to obtain modify RPC slot first (if applicable), so
+ * that threads that are waiting for a modify RPC slot are not polluting
+ * our rpcs in flight counter. */
+
+ if (einfo->ei_enq_slot)
+ ptlrpc_get_mod_rpc_slot(req);
+
+ if (ldlm_request_slot_needed(einfo->ei_type)) {
+ rc = obd_get_request_slot(&req->rq_import->imp_obd->u.cli);
+ if (rc) {
+ if (einfo->ei_enq_slot)
+ ptlrpc_put_mod_rpc_slot(req);
+ failed_lock_cleanup(ns, lock, einfo->ei_mode);
+ LDLM_LOCK_RELEASE(lock);
+ GOTO(out, rc);
+ }
+ }
+
if (async) {
LASSERT(reqp != NULL);
RETURN(0);
else
rc = err;
+out:
if (!req_passed_in && req != NULL) {
ptlrpc_req_finished(req);
if (reqp)
RETURN(rc);
}
+static inline bool mdc_skip_mod_rpc_slot(const struct lookup_intent *it)
+{
+ if (it != NULL &&
+ (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
+ it->it_op == IT_READDIR ||
+ (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
+ return true;
+ return false;
+}
+
/* We always reserve enough space in the reply packet for a stripe MD, because
* we don't know in advance the file type. */
static int mdc_enqueue_base(struct obd_export *exp,
__u64 extra_lock_flags)
{
struct obd_device *obddev = class_exp2obd(exp);
- struct ptlrpc_request *req = NULL;
+ struct ptlrpc_request *req;
__u64 flags, saved_flags = extra_lock_flags;
struct ldlm_res_id res_id;
static const union ldlm_policy_data lookup_policy = {
LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
einfo->ei_type);
res_id.name[3] = LDLM_FLOCK;
+ req = ldlm_enqueue_pack(exp, 0);
} else if (it->it_op & IT_OPEN) {
req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
} else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
req->rq_sent = ktime_get_real_seconds() + resends;
}
- /* It is important to obtain modify RPC slot first (if applicable), so
- * that threads that are waiting for a modify RPC slot are not polluting
- * our rpcs in flight counter.
- * We do not do flock request limiting, though */
- if (it) {
- mdc_get_mod_rpc_slot(req, it);
- rc = obd_get_request_slot(&obddev->u.cli);
- if (rc != 0) {
- mdc_put_mod_rpc_slot(req, it);
- mdc_clear_replay_flag(req, 0);
- ptlrpc_req_finished(req);
- RETURN(rc);
- }
- }
+ einfo->ei_enq_slot = !mdc_skip_mod_rpc_slot(it);
/* With Data-on-MDT the glimpse callback is needed too.
* It is set here in advance but not in mdc_finish_enqueue()
rc = ldlm_cli_enqueue(exp, &req, einfo, &res_id, policy, &flags, NULL,
0, lvb_type, lockh, 0);
+
if (!it) {
/* For flock requests we immediatelly return without further
delay and let caller deal with the rest, since rest of
(einfo->ei_type == LDLM_FLOCK) &&
(einfo->ei_mode == LCK_NL))
goto resend;
+ ptlrpc_req_finished(req);
RETURN(rc);
}
- obd_put_request_slot(&obddev->u.cli);
- mdc_put_mod_rpc_slot(req, it);
-
if (rc < 0) {
CDEBUG(D_INFO,
"%s: ldlm_cli_enqueue "DFID":"DFID"=%s failed: rc = %d\n",
struct obd_export *exp = ga->ga_exp;
struct md_enqueue_info *minfo = ga->ga_minfo;
struct ldlm_enqueue_info *einfo = &minfo->mi_einfo;
- struct lookup_intent *it;
- struct lustre_handle *lockh;
- struct obd_device *obddev;
- struct ldlm_reply *lockrep;
- __u64 flags = LDLM_FL_HAS_INTENT;
+ struct lookup_intent *it;
+ struct lustre_handle *lockh;
+ struct ldlm_reply *lockrep;
+ __u64 flags = LDLM_FL_HAS_INTENT;
ENTRY;
it = &minfo->mi_it;
lockh = &minfo->mi_lockh;
- obddev = class_exp2obd(exp);
-
- obd_put_request_slot(&obddev->u.cli);
if (OBD_FAIL_CHECK(OBD_FAIL_MDC_GETATTR_ENQUEUE))
rc = -ETIMEDOUT;
struct lookup_intent *it = &minfo->mi_it;
struct ptlrpc_request *req;
struct mdc_getattr_args *ga;
- struct obd_device *obddev = class_exp2obd(exp);
struct ldlm_res_id res_id;
union ldlm_policy_data policy = {
.l_inodebits = { MDS_INODELOCK_LOOKUP |
if (IS_ERR(req))
RETURN(PTR_ERR(req));
- rc = obd_get_request_slot(&obddev->u.cli);
- if (rc != 0) {
- ptlrpc_req_finished(req);
- RETURN(rc);
- }
-
/* With Data-on-MDT the glimpse callback is needed too.
* It is set here in advance but not in mdc_finish_enqueue()
* to avoid possible races. It is safe to have glimpse handler
rc = ldlm_cli_enqueue(exp, &req, &minfo->mi_einfo, &res_id, &policy,
&flags, NULL, 0, LVB_T_NONE, &minfo->mi_lockh, 1);
if (rc < 0) {
- obd_put_request_slot(&obddev->u.cli);
ptlrpc_req_finished(req);
RETURN(rc);
}
request->rq_send_state = level;
- mdc_get_mod_rpc_slot(request, NULL);
+ ptlrpc_get_mod_rpc_slot(request);
rc = ptlrpc_queue_wait(request);
- mdc_put_mod_rpc_slot(request, NULL);
+ ptlrpc_put_mod_rpc_slot(request);
if (rc)
CDEBUG(D_INFO, "error in handling %d\n", rc);
else if (!req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY)) {
/* make rpc */
if (opcode == MDS_REINT)
- mdc_get_mod_rpc_slot(req, NULL);
+ ptlrpc_get_mod_rpc_slot(req);
rc = ptlrpc_queue_wait(req);
if (opcode == MDS_REINT)
- mdc_put_mod_rpc_slot(req, NULL);
+ ptlrpc_put_mod_rpc_slot(req);
if (rc)
ptlrpc_req_finished(req);
ptlrpc_request_set_replen(req);
- mdc_get_mod_rpc_slot(req, NULL);
+ ptlrpc_get_mod_rpc_slot(req);
rc = ptlrpc_queue_wait(req);
- mdc_put_mod_rpc_slot(req, NULL);
+ ptlrpc_put_mod_rpc_slot(req);
if (req->rq_repmsg == NULL) {
CDEBUG(D_RPCTRACE, "request %p failed to send: rc = %d\n", req,
ptlrpc_request_set_replen(req);
- mdc_get_mod_rpc_slot(req, NULL);
+ ptlrpc_get_mod_rpc_slot(req);
rc = ptlrpc_queue_wait(req);
- mdc_put_mod_rpc_slot(req, NULL);
+ ptlrpc_put_mod_rpc_slot(req);
GOTO(out, rc);
out:
ptlrpc_request_set_replen(req);
- mdc_get_mod_rpc_slot(req, NULL);
+ ptlrpc_get_mod_rpc_slot(req);
rc = ptlrpc_queue_wait(req);
- mdc_put_mod_rpc_slot(req, NULL);
+ ptlrpc_put_mod_rpc_slot(req);
GOTO(out, rc);
out:
ptlrpc_request_set_replen(req);
- mdc_get_mod_rpc_slot(req, NULL);
+ ptlrpc_get_mod_rpc_slot(req);
rc = ptlrpc_queue_wait(req);
- mdc_put_mod_rpc_slot(req, NULL);
+ ptlrpc_put_mod_rpc_slot(req);
GOTO(out, rc);
return avail;
}
-static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
-{
- if (it != NULL &&
- (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
- it->it_op == IT_READDIR ||
- (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
- return true;
- return false;
-}
/* Get a modify RPC slot from the obd client @cli according
* to the kind of operation @opc that is going to be sent
* Returns the tag to be set in the request message. Tag 0
* is reserved for non-modifying requests.
*/
-__u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
- struct lookup_intent *it)
+__u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
{
bool close_req = false;
__u16 i, max;
- /* read-only metadata RPCs don't consume a slot on MDT
- * for reply reconstruction
- */
- if (obd_skip_mod_rpc_slot(it))
- return 0;
-
if (opc == MDS_CLOSE)
close_req = true;
EXPORT_SYMBOL(obd_get_mod_rpc_slot);
/* Put a modify RPC slot from the obd client @cli according
- * to the kind of operation @opc that has been sent and the
- * intent @it of the operation if it applies.
+ * to the kind of operation @opc that has been sent.
*/
-void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
- struct lookup_intent *it, __u16 tag)
+void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
{
bool close_req = false;
- if (obd_skip_mod_rpc_slot(it))
+ if (tag == 0)
return;
if (opc == MDS_CLOSE)
static atomic64_t ptlrpc_last_xid;
-void ptlrpc_reassign_next_xid(struct ptlrpc_request *req)
+static void ptlrpc_reassign_next_xid(struct ptlrpc_request *req)
{
spin_lock(&req->rq_import->imp_lock);
list_del_init(&req->rq_unreplied_list);
spin_unlock(&req->rq_import->imp_lock);
DEBUG_REQ(D_RPCTRACE, req, "reassign xid");
}
-EXPORT_SYMBOL(ptlrpc_reassign_next_xid);
+
+void ptlrpc_get_mod_rpc_slot(struct ptlrpc_request *req)
+{
+ struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+ __u32 opc;
+ __u16 tag;
+
+ opc = lustre_msg_get_opc(req->rq_reqmsg);
+ tag = obd_get_mod_rpc_slot(cli, opc);
+ lustre_msg_set_tag(req->rq_reqmsg, tag);
+ ptlrpc_reassign_next_xid(req);
+}
+EXPORT_SYMBOL(ptlrpc_get_mod_rpc_slot);
+
+void ptlrpc_put_mod_rpc_slot(struct ptlrpc_request *req)
+{
+ __u16 tag = lustre_msg_get_tag(req->rq_reqmsg);
+
+ if (tag != 0) {
+ struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+ __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
+
+ obd_put_mod_rpc_slot(cli, opc, tag);
+ }
+}
+EXPORT_SYMBOL(ptlrpc_put_mod_rpc_slot);
int ptlrpc_request_bufs_pack(struct ptlrpc_request *request,
__u32 version, int opcode, char **bufs,
list_del(&trd->trd_list);
ted->ted_reply_cnt--;
- if (lut != NULL)
+ if (lut != NULL && trd->trd_index != TRD_INDEX_MEMORY)
tgt_clear_reply_slot(lut, trd->trd_index);
OBD_FREE_PTR(trd);
}
}
}
-int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
+static int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
struct tg_export_data *ted, struct tg_reply_data *trd,
struct ptlrpc_request *req,
struct thandle *th, bool update_lrd_file)
ted->ted_lcd->lcd_last_transno = lrd->lrd_transno;
mutex_unlock(&ted->ted_lcd_lock);
- /* find a empty slot */
- i = tgt_find_free_reply_slot(tgt);
- if (unlikely(i < 0)) {
- CERROR("%s: couldn't find a slot for reply data: "
- "rc = %d\n", tgt_name(tgt), i);
- RETURN(i);
- }
- trd->trd_index = i;
+ if (tgt != NULL) {
+ /* find a empty slot */
+ i = tgt_find_free_reply_slot(tgt);
+ if (unlikely(i < 0)) {
+ CERROR("%s: couldn't find a slot for reply data: "
+ "rc = %d\n", tgt_name(tgt), i);
+ RETURN(i);
+ }
+ trd->trd_index = i;
- if (update_lrd_file) {
- loff_t off;
- int rc;
+ if (update_lrd_file) {
+ loff_t off;
+ int rc;
- /* write reply data to disk */
- off = sizeof(struct lsd_reply_header) + sizeof(*lrd) * i;
- rc = tgt_reply_data_write(env, tgt, lrd, off, th);
- if (unlikely(rc != 0)) {
- CERROR("%s: can't update %s file: rc = %d\n",
- tgt_name(tgt), REPLY_DATA, rc);
- RETURN(rc);
+ /* write reply data to disk */
+ off = sizeof(struct lsd_reply_header) + sizeof(*lrd) * i;
+ rc = tgt_reply_data_write(env, tgt, lrd, off, th);
+ if (unlikely(rc != 0)) {
+ CERROR("%s: can't update %s file: rc = %d\n",
+ tgt_name(tgt), REPLY_DATA, rc);
+ RETURN(rc);
+ }
}
+ } else {
+ trd->trd_index = TRD_INDEX_MEMORY;
}
+
/* add reply data to target export's reply list */
mutex_lock(&ted->ted_lcd_lock);
if (req != NULL) {
RETURN(0);
}
-EXPORT_SYMBOL(tgt_add_reply_data);
+
+int tgt_mk_reply_data(const struct lu_env *env,
+ struct lu_target *tgt,
+ struct tg_export_data *ted,
+ struct ptlrpc_request *req,
+ __u64 opdata,
+ struct thandle *th,
+ bool write_update,
+ __u64 transno)
+{
+ struct tg_reply_data *trd;
+ struct lsd_reply_data *lrd;
+ __u64 *pre_versions = NULL;
+ int rc;
+
+ OBD_ALLOC_PTR(trd);
+ if (unlikely(trd == NULL))
+ RETURN(-ENOMEM);
+
+ /* fill reply data information */
+ lrd = &trd->trd_reply;
+ lrd->lrd_transno = transno;
+ if (req != NULL) {
+ lrd->lrd_xid = req->rq_xid;
+ trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg);
+ lrd->lrd_client_gen = ted->ted_lcd->lcd_generation;
+ if (write_update) {
+ pre_versions = lustre_msg_get_versions(req->rq_repmsg);
+ lrd->lrd_result = th->th_result;
+ }
+ } else {
+ struct tgt_session_info *tsi;
+
+ LASSERT(env != NULL);
+ tsi = tgt_ses_info(env);
+ LASSERT(tsi->tsi_xid != 0);
+
+ lrd->lrd_xid = tsi->tsi_xid;
+ lrd->lrd_result = tsi->tsi_result;
+ lrd->lrd_client_gen = tsi->tsi_client_gen;
+ }
+
+ lrd->lrd_data = opdata;
+ if (pre_versions) {
+ trd->trd_pre_versions[0] = pre_versions[0];
+ trd->trd_pre_versions[1] = pre_versions[1];
+ trd->trd_pre_versions[2] = pre_versions[2];
+ trd->trd_pre_versions[3] = pre_versions[3];
+ }
+
+ rc = tgt_add_reply_data(env, tgt, ted, trd, req,
+ th, write_update);
+ if (rc < 0)
+ OBD_FREE_PTR(trd);
+ return rc;
+
+}
+EXPORT_SYMBOL(tgt_mk_reply_data);
/*
* last_rcvd & last_committed update callbacks
/* Target that supports multiple reply data */
if (tgt_is_multimodrpcs_client(exp)) {
- struct tg_reply_data *trd;
- struct lsd_reply_data *lrd;
- __u64 *pre_versions;
- bool write_update;
-
- OBD_ALLOC_PTR(trd);
- if (unlikely(trd == NULL))
- RETURN(-ENOMEM);
-
- /* fill reply data information */
- lrd = &trd->trd_reply;
- lrd->lrd_transno = tti->tti_transno;
- if (req != NULL) {
- lrd->lrd_xid = req->rq_xid;
- trd->trd_tag = lustre_msg_get_tag(req->rq_reqmsg);
- pre_versions = lustre_msg_get_versions(req->rq_repmsg);
- lrd->lrd_result = th->th_result;
- lrd->lrd_client_gen = ted->ted_lcd->lcd_generation;
- write_update = true;
- } else {
- LASSERT(tsi->tsi_xid != 0);
- lrd->lrd_xid = tsi->tsi_xid;
- lrd->lrd_result = tsi->tsi_result;
- lrd->lrd_client_gen = tsi->tsi_client_gen;
- trd->trd_tag = 0;
- pre_versions = NULL;
- write_update = false;
- }
-
- lrd->lrd_data = opdata;
- if (pre_versions) {
- trd->trd_pre_versions[0] = pre_versions[0];
- trd->trd_pre_versions[1] = pre_versions[1];
- trd->trd_pre_versions[2] = pre_versions[2];
- trd->trd_pre_versions[3] = pre_versions[3];
- }
-
- rc = tgt_add_reply_data(env, tgt, ted, trd, req,
- th, write_update);
- if (rc < 0)
- OBD_FREE_PTR(trd);
- return rc;
+ return tgt_mk_reply_data(env, tgt, ted, req, opdata, th,
+ !!(req != NULL), tti->tti_transno);
}
/* Enough for update replay, let's return */