}
}
+/** Change request export and move hp request from old export to new */
+void ptlrpc_request_change_export(struct ptlrpc_request *req,
+ struct obd_export *export)
+{
+ if (req->rq_export != NULL) {
+ if (!cfs_list_empty(&req->rq_exp_list)) {
+ /* remove rq_exp_list from last export */
+ spin_lock_bh(&req->rq_export->exp_rpc_lock);
+ cfs_list_del_init(&req->rq_exp_list);
+ spin_unlock_bh(&req->rq_export->exp_rpc_lock);
+
+ /* export has one reference already, so it`s safe to
+ * add req to export queue here and get another
+ * reference for request later */
+ spin_lock_bh(&export->exp_rpc_lock);
+ cfs_list_add(&req->rq_exp_list, &export->exp_hp_rpcs);
+ spin_unlock_bh(&export->exp_rpc_lock);
+ }
+ class_export_rpc_dec(req->rq_export);
+ class_export_put(req->rq_export);
+ }
+
+ /* request takes one export refcount */
+ req->rq_export = class_export_get(export);
+ class_export_rpc_inc(export);
+
+ return;
+}
+
/**
* to finish a request: stop sending more early replies, and release
- * the request. should be called after we finished handling the request.
+ * the request.
*/
static void ptlrpc_server_finish_request(struct ptlrpc_service_part *svcpt,
struct ptlrpc_request *req)
{
ptlrpc_server_hpreq_fini(req);
+ ptlrpc_server_drop_request(req);
+}
+
+/**
+ * to finish a active request: stop sending more early replies, and release
+ * the request. should be called after we finished handling the request.
+ */
+static void ptlrpc_server_finish_active_request(
+ struct ptlrpc_service_part *svcpt,
+ struct ptlrpc_request *req)
+{
spin_lock(&svcpt->scp_req_lock);
ptlrpc_nrs_req_stop_nolock(req);
svcpt->scp_nreqs_active--;
ptlrpc_nrs_req_finalize(req);
- ptlrpc_server_drop_request(req);
+ if (req->rq_export != NULL)
+ class_export_rpc_dec(req->rq_export);
+
+ ptlrpc_server_finish_request(svcpt, req);
}
/**
GOTO(out, rc = -ENODEV);
/* RPC ref */
- class_export_rpc_get(reqcopy->rq_export);
+ class_export_rpc_inc(reqcopy->rq_export);
if (reqcopy->rq_export->exp_obd &&
reqcopy->rq_export->exp_obd->obd_fail)
GOTO(out_put, rc = -ENODEV);
ptlrpc_req_drop_rs(reqcopy);
out_put:
- class_export_rpc_put(reqcopy->rq_export);
+ class_export_rpc_dec(reqcopy->rq_export);
class_export_put(reqcopy->rq_export);
out:
sptlrpc_svc_ctx_decref(reqcopy);
static struct ptlrpc_request *
ptlrpc_server_request_get(struct ptlrpc_service_part *svcpt, bool force)
{
- struct ptlrpc_request *req;
+ struct ptlrpc_request *req = NULL;
ENTRY;
+ spin_lock(&svcpt->scp_req_lock);
+#ifndef __KERNEL__
+ /* !@%$# liblustre only has 1 thread */
+ if (cfs_atomic_read(&svcpt->scp_nreps_difficult) != 0) {
+ spin_unlock(&svcpt->scp_req_lock);
+ RETURN(NULL);
+ }
+#endif
+
if (ptlrpc_server_high_pending(svcpt, force)) {
req = ptlrpc_nrs_req_get_nolock(svcpt, true, force);
if (req != NULL) {
svcpt->scp_hreq_count++;
- RETURN(req);
+ goto got_request;
}
}
req = ptlrpc_nrs_req_get_nolock(svcpt, false, force);
if (req != NULL) {
svcpt->scp_hreq_count = 0;
- RETURN(req);
+ goto got_request;
}
}
+
+ spin_unlock(&svcpt->scp_req_lock);
RETURN(NULL);
+
+got_request:
+ svcpt->scp_nreqs_active++;
+ if (req->rq_hp)
+ svcpt->scp_nhreqs_active++;
+
+ spin_unlock(&svcpt->scp_req_lock);
+
+ if (likely(req->rq_export))
+ class_export_rpc_inc(req->rq_export);
+
+ RETURN(req);
}
/**
req->rq_export = class_conn2export(
lustre_msg_get_handle(req->rq_reqmsg));
if (req->rq_export) {
- class_export_rpc_get(req->rq_export);
rc = ptlrpc_check_req(req);
if (rc == 0) {
rc = sptlrpc_target_export_check(req->rq_export, req);
/* Move it over to the request processing queue */
rc = ptlrpc_server_request_add(svcpt, req);
- if (rc) {
- ptlrpc_server_hpreq_fini(req);
+ if (rc)
GOTO(err_req, rc);
- }
+
cfs_waitq_signal(&svcpt->scp_waitq);
RETURN(1);
err_req:
- if (req->rq_export)
- class_export_rpc_put(req->rq_export);
- spin_lock(&svcpt->scp_req_lock);
- svcpt->scp_nreqs_active++;
- spin_unlock(&svcpt->scp_req_lock);
ptlrpc_server_finish_request(svcpt, req);
RETURN(1);
struct ptlrpc_thread *thread)
{
struct ptlrpc_service *svc = svcpt->scp_service;
- struct obd_export *export = NULL;
struct ptlrpc_request *request;
struct timeval work_start;
struct timeval work_end;
int fail_opc = 0;
ENTRY;
- spin_lock(&svcpt->scp_req_lock);
-#ifndef __KERNEL__
- /* !@%$# liblustre only has 1 thread */
- if (cfs_atomic_read(&svcpt->scp_nreps_difficult) != 0) {
- spin_unlock(&svcpt->scp_req_lock);
- RETURN(0);
- }
-#endif
request = ptlrpc_server_request_get(svcpt, false);
- if (request == NULL) {
- spin_unlock(&svcpt->scp_req_lock);
- RETURN(0);
- }
+ if (request == NULL)
+ RETURN(0);
if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT))
fail_opc = OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT;
fail_opc = OBD_FAIL_PTLRPC_HPREQ_TIMEOUT;
if (unlikely(fail_opc)) {
- if (request->rq_export && request->rq_ops) {
- spin_unlock(&svcpt->scp_req_lock);
-
+ if (request->rq_export && request->rq_ops)
OBD_FAIL_TIMEOUT(fail_opc, 4);
-
- spin_lock(&svcpt->scp_req_lock);
- }
}
- svcpt->scp_nreqs_active++;
- if (request->rq_hp)
- svcpt->scp_nhreqs_active++;
-
- spin_unlock(&svcpt->scp_req_lock);
ptlrpc_rqphase_move(request, RQ_PHASE_INTERPRET);
at_get(&svcpt->scp_at_estimate));
}
- export = request->rq_export;
rc = lu_context_init(&request->rq_session, LCT_SESSION | LCT_NOREF);
if (rc) {
CERROR("Failure to initialize session: %d\n", rc);
}
out_req:
- if (export != NULL)
- class_export_rpc_put(export);
- ptlrpc_server_finish_request(svcpt, request);
+ ptlrpc_server_finish_active_request(svcpt, request);
RETURN(1);
}
cfs_list_del(&req->rq_list);
svcpt->scp_nreqs_incoming--;
- svcpt->scp_nreqs_active++;
ptlrpc_server_finish_request(svcpt, req);
}
while (ptlrpc_server_request_pending(svcpt, true)) {
req = ptlrpc_server_request_get(svcpt, true);
- svcpt->scp_nreqs_active++;
- ptlrpc_server_hpreq_fini(req);
-
- if (req->rq_export != NULL)
- class_export_rpc_put(req->rq_export);
- ptlrpc_server_finish_request(svcpt, req);
+ ptlrpc_server_finish_active_request(svcpt, req);
}
LASSERT(cfs_list_empty(&svcpt->scp_rqbd_posted));