* rq_list
*/
spinlock_t rq_lock;
+ spinlock_t rq_early_free_lock;
/** client-side flags are serialized by rq_lock @{ */
unsigned int rq_intr:1, rq_replied:1, rq_err:1,
rq_timedout:1, rq_resend:1, rq_restart:1,
rq_allow_replay:1,
/* bulk request, sent to server, but uncommitted */
rq_unstable:1,
+ rq_early_free_repbuf:1, /* free reply buffer in advance */
rq_allow_intr:1;
/** @} */
body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
LASSERT(body != NULL);
- och = mod->mod_och;
- if (och != NULL) {
- struct lustre_handle *file_fh;
+ spin_lock(&req->rq_lock);
+ och = mod->mod_och;
+ if (och && och->och_fh.cookie)
+ req->rq_early_free_repbuf = 1;
+ else
+ req->rq_early_free_repbuf = 0;
+ spin_unlock(&req->rq_lock);
+
+ if (req->rq_early_free_repbuf) {
+ struct lustre_handle *file_fh;
- LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC);
+ LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC);
- file_fh = &och->och_fh;
+ file_fh = &och->och_fh;
CDEBUG(D_HA, "updating handle from %#llx to %#llx\n",
file_fh->cookie, body->mbo_handle.cookie);
old = *file_fh;
*file_fh = body->mbo_handle;
- }
- close_req = mod->mod_close_req;
- if (close_req != NULL) {
- __u32 opc = lustre_msg_get_opc(close_req->rq_reqmsg);
- struct mdt_ioepoch *epoch;
+ }
+
+ close_req = mod->mod_close_req;
+ if (close_req) {
+ __u32 opc = lustre_msg_get_opc(close_req->rq_reqmsg);
+ struct mdt_ioepoch *epoch;
LASSERT(opc == MDS_CLOSE);
epoch = req_capsule_client_get(&close_req->rq_pill,
&RMF_MDT_EPOCH);
LASSERT(epoch);
- if (och != NULL)
+ if (req->rq_early_free_repbuf)
LASSERT(!memcmp(&old, &epoch->mio_handle, sizeof(old)));
DEBUG_REQ(D_HA, close_req, "updating close body with new fh");
mod->mod_open_req = open_req;
open_req->rq_cb_data = mod;
open_req->rq_commit_cb = mdc_commit_open;
+ open_req->rq_early_free_repbuf = 1;
spin_unlock(&open_req->rq_lock);
}
if (mod == NULL)
RETURN(0);
- LASSERT(mod != LP_POISON);
+ LASSERT(mod != LP_POISON);
LASSERT(mod->mod_open_req != NULL);
+
+ spin_lock(&mod->mod_open_req->rq_lock);
+ if (mod->mod_och)
+ mod->mod_och->och_fh.cookie = 0;
+ mod->mod_open_req->rq_early_free_repbuf = 0;
+ spin_unlock(&mod->mod_open_req->rq_lock);
mdc_free_open(mod);
mod->mod_och = NULL;
* Drops one reference count for request \a request.
* \a locked set indicates that caller holds import imp_lock.
* Frees the request whe reference count reaches zero.
+ *
+ * \retval 1 the request is freed
+ * \retval 0 some others still hold references on the request
*/
static int __ptlrpc_req_finished(struct ptlrpc_request *request, int locked)
{
- ENTRY;
- if (request == NULL)
- RETURN(1);
+ int count;
+ ENTRY;
- if (request == LP_POISON ||
- request->rq_reqmsg == LP_POISON) {
- CERROR("dereferencing freed request (bug 575)\n");
- LBUG();
- RETURN(1);
- }
+ if (!request)
+ RETURN(1);
- DEBUG_REQ(D_INFO, request, "refcount now %u",
+ LASSERT(request != LP_POISON);
+ LASSERT(request->rq_reqmsg != LP_POISON);
+
+ DEBUG_REQ(D_INFO, request, "refcount now %u",
atomic_read(&request->rq_refcount) - 1);
- if (atomic_dec_and_test(&request->rq_refcount)) {
- __ptlrpc_free_req(request, locked);
- RETURN(1);
- }
+ spin_lock(&request->rq_lock);
+ count = atomic_dec_return(&request->rq_refcount);
+ LASSERTF(count >= 0, "Invalid ref count %d\n", count);
- RETURN(0);
+ /* For open RPC, the client does not know the EA size (LOV, ACL, and
+ * so on) before replied, then the client has to reserve very large
+ * reply buffer. Such buffer will not be released until the RPC freed.
+ * Since The open RPC is replayable, we need to keep it in the replay
+ * list until close. If there are a lot of files opened concurrently,
+ * then the client may be OOM.
+ *
+ * If fact, it is unnecessary to keep reply buffer for open replay,
+ * related EAs have already been saved via mdc_save_lovea() before
+ * coming here. So it is safe to free the reply buffer some earlier
+ * before releasing the RPC to avoid client OOM. LU-9514 */
+ if (count == 1 && request->rq_early_free_repbuf && request->rq_repbuf) {
+ spin_lock(&request->rq_early_free_lock);
+ sptlrpc_cli_free_repbuf(request);
+ request->rq_repbuf = NULL;
+ request->rq_repbuf_len = 0;
+ request->rq_repdata = NULL;
+ request->rq_reqdata_len = 0;
+ spin_unlock(&request->rq_early_free_lock);
+ }
+ spin_unlock(&request->rq_lock);
+
+ if (!count)
+ __ptlrpc_free_req(request, locked);
+
+ RETURN(!count);
}
/**
DEBUG_REQ(D_HA, req, "REPLAY");
atomic_inc(&req->rq_import->imp_replay_inflight);
+ spin_lock(&req->rq_lock);
+ req->rq_early_free_repbuf = 0;
+ spin_unlock(&req->rq_lock);
ptlrpc_request_addref(req); /* ptlrpcd needs a ref */
ptlrpcd_add_req(req);
static inline int rep_ptlrpc_body_swabbed(struct ptlrpc_request *req)
{
- LASSERT(req->rq_repmsg);
+ if (unlikely(!req->rq_repmsg))
+ return 0;
- switch (req->rq_repmsg->lm_magic) {
- case LUSTRE_MSG_MAGIC_V2:
- return lustre_rep_swabbed(req, MSG_PTLRPC_BODY_OFF);
- default:
- /* uninitialized yet */
- return 0;
- }
+ switch (req->rq_repmsg->lm_magic) {
+ case LUSTRE_MSG_MAGIC_V2:
+ return lustre_rep_swabbed(req, MSG_PTLRPC_BODY_OFF);
+ default:
+ /* uninitialized yet */
+ return 0;
+ }
}
void _debug_req(struct ptlrpc_request *req,
- struct libcfs_debug_msg_data *msgdata,
- const char *fmt, ... )
+ struct libcfs_debug_msg_data *msgdata, const char *fmt, ...)
{
- int req_ok = req->rq_reqmsg != NULL;
- int rep_ok = req->rq_repmsg != NULL;
- lnet_nid_t nid = LNET_NID_ANY;
- va_list args;
+ bool req_ok = req->rq_reqmsg != NULL;
+ bool rep_ok = false;
+ lnet_nid_t nid = LNET_NID_ANY;
+ va_list args;
+ int rep_flags = -1;
+ int rep_status = -1;
- if (ptlrpc_req_need_swab(req)) {
- req_ok = req_ok && req_ptlrpc_body_swabbed(req);
- rep_ok = rep_ok && rep_ptlrpc_body_swabbed(req);
- }
+ spin_lock(&req->rq_early_free_lock);
+ if (req->rq_repmsg)
+ rep_ok = true;
+
+ if (ptlrpc_req_need_swab(req)) {
+ req_ok = req_ok && req_ptlrpc_body_swabbed(req);
+ rep_ok = rep_ok && rep_ptlrpc_body_swabbed(req);
+ }
+
+ if (rep_ok) {
+ rep_flags = lustre_msg_get_flags(req->rq_repmsg);
+ rep_status = lustre_msg_get_status(req->rq_repmsg);
+ }
+ spin_unlock(&req->rq_early_free_lock);
- if (req->rq_import && req->rq_import->imp_connection)
- nid = req->rq_import->imp_connection->c_peer.nid;
- else if (req->rq_export && req->rq_export->exp_connection)
- nid = req->rq_export->exp_connection->c_peer.nid;
+ if (req->rq_import && req->rq_import->imp_connection)
+ nid = req->rq_import->imp_connection->c_peer.nid;
+ else if (req->rq_export && req->rq_export->exp_connection)
+ nid = req->rq_export->exp_connection->c_peer.nid;
va_start(args, fmt);
libcfs_debug_vmsg2(msgdata, fmt, args,
atomic_read(&req->rq_refcount),
DEBUG_REQ_FLAGS(req),
req_ok ? lustre_msg_get_flags(req->rq_reqmsg) : -1,
- rep_ok ? lustre_msg_get_flags(req->rq_repmsg) : -1,
- req->rq_status,
- rep_ok ? lustre_msg_get_status(req->rq_repmsg) : -1);
+ rep_flags, req->rq_status, rep_status);
va_end(args);
}
EXPORT_SYMBOL(_debug_req);
static inline void ptlrpc_req_comm_init(struct ptlrpc_request *req)
{
spin_lock_init(&req->rq_lock);
+ spin_lock_init(&req->rq_early_free_lock);
atomic_set(&req->rq_refcount, 1);
INIT_LIST_HEAD(&req->rq_list);
INIT_LIST_HEAD(&req->rq_replay_list);
[ "$SLOW" = "no" ] && EXCEPT_SLOW="27m 64b 68 71 115 300o"
if [ $(facet_fstype $SINGLEMDS) = "zfs" ]; then
- # bug number for skipped test: LU-9514 LU-4536 LU-1957
- ALWAYS_EXCEPT="$ALWAYS_EXCEPT 51f 65ic 180"
+ # bug number for skipped test: LU-4536 LU-1957
+ ALWAYS_EXCEPT="$ALWAYS_EXCEPT 65ic 180"
# 13 (min)"
[ "$SLOW" = "no" ] && EXCEPT_SLOW="$EXCEPT_SLOW 51b"
fi