*/
struct ptlrpc_service *
ptlrpc_register_service(struct ptlrpc_service_conf *conf,
- cfs_proc_dir_entry_t *proc_entry)
+ struct proc_dir_entry *proc_entry)
{
struct ptlrpc_service_cpt_conf *cconf = &conf->psc_cpt;
struct ptlrpc_service *service;
struct obd_export *export)
{
if (req->rq_export != NULL) {
- if (!cfs_list_empty(&req->rq_exp_list)) {
- /* remove rq_exp_list from last export */
- spin_lock_bh(&req->rq_export->exp_rpc_lock);
- cfs_list_del_init(&req->rq_exp_list);
- spin_unlock_bh(&req->rq_export->exp_rpc_lock);
+ LASSERT(!list_empty(&req->rq_exp_list));
+ /* remove rq_exp_list from last export */
+ spin_lock_bh(&req->rq_export->exp_rpc_lock);
+ list_del_init(&req->rq_exp_list);
+ spin_unlock_bh(&req->rq_export->exp_rpc_lock);
+ /* export has one reference already, so it`s safe to
+ * add req to export queue here and get another
+ * reference for request later */
+ spin_lock_bh(&export->exp_rpc_lock);
+ if (req->rq_ops != NULL) /* hp request */
+ list_add(&req->rq_exp_list, &export->exp_hp_rpcs);
+ else
+ list_add(&req->rq_exp_list, &export->exp_reg_rpcs);
+ spin_unlock_bh(&export->exp_rpc_lock);
- /* export has one reference already, so it`s safe to
- * add req to export queue here and get another
- * reference for request later */
- spin_lock_bh(&export->exp_rpc_lock);
- cfs_list_add(&req->rq_exp_list, &export->exp_hp_rpcs);
- spin_unlock_bh(&export->exp_rpc_lock);
- }
class_export_rpc_dec(req->rq_export);
class_export_put(req->rq_export);
}
{
ptlrpc_server_hpreq_fini(req);
+ if (req->rq_session.lc_thread != NULL) {
+ lu_context_exit(&req->rq_session);
+ lu_context_fini(&req->rq_session);
+ }
+
ptlrpc_server_drop_request(req);
}
RETURN(1); /* return "did_something" for liblustre */
}
+/* Check if we are already handling earlier incarnation of this request.
+ * Called under &req->rq_export->exp_rpc_lock locked */
+static int ptlrpc_server_check_resend_in_progress(struct ptlrpc_request *req)
+{
+ struct ptlrpc_request *tmp = NULL;
+
+ if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) ||
+ (cfs_atomic_read(&req->rq_export->exp_rpc_count) == 0))
+ return 0;
+
+ /* bulk request are aborted upon reconnect, don't try to
+ * find a match */
+ if (req->rq_bulk_write || req->rq_bulk_read)
+ return 0;
+
+ /* This list should not be longer than max_requests in
+ * flights on the client, so it is not all that long.
+ * Also we only hit this codepath in case of a resent
+ * request which makes it even more rarely hit */
+ cfs_list_for_each_entry(tmp, &req->rq_export->exp_reg_rpcs,
+ rq_exp_list) {
+ /* Found duplicate one */
+ if (tmp->rq_xid == req->rq_xid)
+ goto found;
+ }
+ cfs_list_for_each_entry(tmp, &req->rq_export->exp_hp_rpcs,
+ rq_exp_list) {
+ /* Found duplicate one */
+ if (tmp->rq_xid == req->rq_xid)
+ goto found;
+ }
+ return 0;
+
+found:
+ DEBUG_REQ(D_HA, req, "Found duplicate req in processing\n");
+ DEBUG_REQ(D_HA, tmp, "Request being processed\n");
+ return -EBUSY;
+}
+
/**
* Put the request to the export list if the request may become
* a high priority one.
static int ptlrpc_server_hpreq_init(struct ptlrpc_service_part *svcpt,
struct ptlrpc_request *req)
{
- int rc = 0;
+ cfs_list_t *list;
+ int rc, hp = 0;
+
ENTRY;
if (svcpt->scp_service->srv_ops.so_hpreq_handler) {
RETURN(rc);
LASSERT(rc == 0);
}
- if (req->rq_export && req->rq_ops) {
- /* Perform request specific check. We should do this check
- * before the request is added into exp_hp_rpcs list otherwise
- * it may hit swab race at LU-1044. */
- if (req->rq_ops->hpreq_check) {
- rc = req->rq_ops->hpreq_check(req);
- /**
- * XXX: Out of all current
- * ptlrpc_hpreq_ops::hpreq_check(), only
- * ldlm_cancel_hpreq_check() can return an error code;
- * other functions assert in similar places, which seems
- * odd. What also does not seem right is that handlers
- * for those RPCs do not assert on the same checks, but
- * rather handle the error cases. e.g. see
- * ost_rw_hpreq_check(), and ost_brw_read(),
- * ost_brw_write().
- */
- if (rc < 0)
- RETURN(rc);
- LASSERT(rc == 0 || rc == 1);
+ if (req->rq_export) {
+ if (req->rq_ops) {
+ /* Perform request specific check. We should do this
+ * check before the request is added into exp_hp_rpcs
+ * list otherwise it may hit swab race at LU-1044. */
+ if (req->rq_ops->hpreq_check) {
+ rc = req->rq_ops->hpreq_check(req);
+ /**
+ * XXX: Out of all current
+ * ptlrpc_hpreq_ops::hpreq_check(), only
+ * ldlm_cancel_hpreq_check() can return an
+ * error code; other functions assert in
+ * similar places, which seems odd.
+ * What also does not seem right is that
+ * handlers for those RPCs do not assert
+ * on the same checks, but rather handle the
+ * error cases. e.g. see ost_rw_hpreq_check(),
+ * and ost_brw_read(), ost_brw_write().
+ */
+ if (rc < 0)
+ RETURN(rc);
+ LASSERT(rc == 0 || rc == 1);
+ hp = rc;
+ }
+ list = &req->rq_export->exp_hp_rpcs;
+ } else {
+ list = &req->rq_export->exp_reg_rpcs;
}
+ /* do search for duplicated xid and the adding to the list
+ * atomically */
spin_lock_bh(&req->rq_export->exp_rpc_lock);
- cfs_list_add(&req->rq_exp_list,
- &req->rq_export->exp_hp_rpcs);
+ rc = ptlrpc_server_check_resend_in_progress(req);
+ if (rc < 0) {
+ spin_unlock_bh(&req->rq_export->exp_rpc_lock);
+ RETURN(rc);
+ }
+ cfs_list_add(&req->rq_exp_list, list);
spin_unlock_bh(&req->rq_export->exp_rpc_lock);
}
- ptlrpc_nrs_req_initialize(svcpt, req, rc);
+ ptlrpc_nrs_req_initialize(svcpt, req, !!hp);
- RETURN(rc);
+ RETURN(hp);
}
/** Remove the request from the export list. */
static void ptlrpc_server_hpreq_fini(struct ptlrpc_request *req)
{
- ENTRY;
- if (req->rq_export && req->rq_ops) {
- /* refresh lock timeout again so that client has more
- * room to send lock cancel RPC. */
- if (req->rq_ops->hpreq_fini)
- req->rq_ops->hpreq_fini(req);
+ ENTRY;
+ if (req->rq_export) {
+ /* refresh lock timeout again so that client has more
+ * room to send lock cancel RPC. */
+ if (req->rq_ops && req->rq_ops->hpreq_fini)
+ req->rq_ops->hpreq_fini(req);
spin_lock_bh(&req->rq_export->exp_rpc_lock);
cfs_list_del_init(&req->rq_exp_list);
goto err_req;
}
- req->rq_svc_thread = thread;
+ req->rq_svc_thread = thread;
+ if (thread != NULL) {
+ /* initialize request session, it is needed for request
+ * processing by target */
+ rc = lu_context_init(&req->rq_session, LCT_SERVER_SESSION |
+ LCT_NOREF);
+ if (rc) {
+ CERROR("%s: failure to initialize session: rc = %d\n",
+ thread->t_name, rc);
+ goto err_req;
+ }
+ req->rq_session.lc_thread = thread;
+ lu_context_enter(&req->rq_session);
+ req->rq_svc_thread->t_env->le_ses = &req->rq_session;
+ }
- ptlrpc_at_add_timed(req);
+ ptlrpc_at_add_timed(req);
- /* Move it over to the request processing queue */
+ /* Move it over to the request processing queue */
rc = ptlrpc_server_request_add(svcpt, req);
if (rc)
GOTO(err_req, rc);
ptlrpc_server_handle_request(struct ptlrpc_service_part *svcpt,
struct ptlrpc_thread *thread)
{
- struct ptlrpc_service *svc = svcpt->scp_service;
- struct ptlrpc_request *request;
- struct timeval work_start;
- struct timeval work_end;
- long timediff;
- int rc;
- int fail_opc = 0;
- ENTRY;
+ struct ptlrpc_service *svc = svcpt->scp_service;
+ struct ptlrpc_request *request;
+ struct timeval work_start;
+ struct timeval work_end;
+ long timediff;
+ int fail_opc = 0;
+
+ ENTRY;
request = ptlrpc_server_request_get(svcpt, false);
if (request == NULL)
at_get(&svcpt->scp_at_estimate));
}
- rc = lu_context_init(&request->rq_session, LCT_SERVER_SESSION |
- LCT_NOREF);
- if (rc) {
- CERROR("Failure to initialize session: %d\n", rc);
- goto out_req;
- }
- request->rq_session.lc_thread = thread;
- request->rq_session.lc_cookie = 0x5;
- lu_context_enter(&request->rq_session);
-
- CDEBUG(D_NET, "got req "LPU64"\n", request->rq_xid);
-
- request->rq_svc_thread = thread;
- if (thread)
- request->rq_svc_thread->t_env->le_ses = &request->rq_session;
-
- if (likely(request->rq_export)) {
+ if (likely(request->rq_export)) {
if (unlikely(ptlrpc_check_req(request)))
goto put_conn;
ptlrpc_update_export_timer(request->rq_export, timediff >> 19);
if (lustre_msg_get_opc(request->rq_reqmsg) != OBD_PING)
CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_PAUSE_REQ, cfs_fail_val);
- rc = svc->srv_ops.so_req_handler(request);
+ CDEBUG(D_NET, "got req "LPU64"\n", request->rq_xid);
- ptlrpc_rqphase_move(request, RQ_PHASE_COMPLETE);
+ /* re-assign request and sesson thread to the current one */
+ request->rq_svc_thread = thread;
+ if (thread != NULL) {
+ LASSERT(request->rq_session.lc_thread != NULL);
+ request->rq_session.lc_thread = thread;
+ request->rq_session.lc_cookie = 0x55;
+ thread->t_env->le_ses = &request->rq_session;
+ }
+ svc->srv_ops.so_req_handler(request);
-put_conn:
- lu_context_exit(&request->rq_session);
- lu_context_fini(&request->rq_session);
+ ptlrpc_rqphase_move(request, RQ_PHASE_COMPLETE);
+put_conn:
if (unlikely(cfs_time_current_sec() > request->rq_deadline)) {
DEBUG_REQ(D_WARNING, request, "Request took longer "
"than estimated ("CFS_DURATION_T":"CFS_DURATION_T"s);"
request->rq_arrival_time.tv_sec));
}
-out_req:
ptlrpc_server_finish_active_request(svcpt, request);
RETURN(1);