static CFS_LIST_HEAD(ptlrpc_all_services);
cfs_spinlock_t ptlrpc_all_services_lock;
-static char *
-ptlrpc_alloc_request_buffer (int size)
-{
- char *ptr;
-
- if (size > SVC_BUF_VMALLOC_THRESHOLD)
- OBD_VMALLOC(ptr, size);
- else
- OBD_ALLOC(ptr, size);
-
- return (ptr);
-}
-
-static void
-ptlrpc_free_request_buffer (char *ptr, int size)
-{
- if (size > SVC_BUF_VMALLOC_THRESHOLD)
- OBD_VFREE(ptr, size);
- else
- OBD_FREE(ptr, size);
-}
-
struct ptlrpc_request_buffer_desc *
ptlrpc_alloc_rqbd (struct ptlrpc_service *svc)
{
rqbd->rqbd_cbid.cbid_fn = request_in_callback;
rqbd->rqbd_cbid.cbid_arg = rqbd;
CFS_INIT_LIST_HEAD(&rqbd->rqbd_reqs);
- rqbd->rqbd_buffer = ptlrpc_alloc_request_buffer(svc->srv_buf_size);
+ OBD_ALLOC_LARGE(rqbd->rqbd_buffer, svc->srv_buf_size);
if (rqbd->rqbd_buffer == NULL) {
OBD_FREE_PTR(rqbd);
svc->srv_nbufs--;
cfs_spin_unlock(&svc->srv_lock);
- ptlrpc_free_request_buffer (rqbd->rqbd_buffer, svc->srv_buf_size);
+ OBD_FREE_LARGE(rqbd->rqbd_buffer, svc->srv_buf_size);
OBD_FREE_PTR(rqbd);
}
}
/**
- * increment the number of active requests consuming service threads.
- */
-void ptlrpc_server_active_request_inc(struct ptlrpc_request *req)
-{
- struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
- struct ptlrpc_service *svc = rqbd->rqbd_service;
-
- cfs_spin_lock(&svc->srv_rq_lock);
- svc->srv_n_active_reqs++;
- cfs_spin_unlock(&svc->srv_rq_lock);
-}
-
-/**
- * decrement the number of active requests consuming service threads.
- */
-void ptlrpc_server_active_request_dec(struct ptlrpc_request *req)
-{
- struct ptlrpc_request_buffer_desc *rqbd = req->rq_rqbd;
- struct ptlrpc_service *svc = rqbd->rqbd_service;
-
- cfs_spin_lock(&svc->srv_rq_lock);
- svc->srv_n_active_reqs--;
- cfs_spin_unlock(&svc->srv_rq_lock);
-}
-
-/**
* drop a reference count of the request. if it reaches 0, we either
* put it into history list, or free it immediately.
*/
*/
static int ptlrpc_check_req(struct ptlrpc_request *req)
{
+ int rc = 0;
+
if (unlikely(lustre_msg_get_conn_cnt(req->rq_reqmsg) <
req->rq_export->exp_conn_cnt)) {
DEBUG_REQ(D_ERROR, req,
error response instead. */
CDEBUG(D_RPCTRACE, "Dropping req %p for failed obd %s\n",
req, req->rq_export->exp_obd->obd_name);
- req->rq_status = -ENODEV;
+ rc = -ENODEV;
+ } else if (lustre_msg_get_flags(req->rq_reqmsg) &
+ (MSG_REPLAY | MSG_REQ_REPLAY_DONE) &&
+ !(req->rq_export->exp_obd->obd_recovering)) {
+ DEBUG_REQ(D_ERROR, req,
+ "Invalid replay without recovery");
+ class_fail_export(req->rq_export);
+ rc = -ENODEV;
+ } else if (lustre_msg_get_transno(req->rq_reqmsg) != 0 &&
+ !(req->rq_export->exp_obd->obd_recovering)) {
+ DEBUG_REQ(D_ERROR, req, "Invalid req with transno "
+ LPU64" without recovery",
+ lustre_msg_get_transno(req->rq_reqmsg));
+ class_fail_export(req->rq_export);
+ rc = -ENODEV;
+ }
+
+ if (unlikely(rc < 0)) {
+ req->rq_status = rc;
ptlrpc_error(req);
- return -ENODEV;
}
-
- return 0;
+ return rc;
}
static void ptlrpc_at_set_timer(struct ptlrpc_service *svc)
OBD_ALLOC(reqcopy, sizeof *reqcopy);
if (reqcopy == NULL)
RETURN(-ENOMEM);
- OBD_ALLOC(reqmsg, req->rq_reqlen);
+ OBD_ALLOC_LARGE(reqmsg, req->rq_reqlen);
if (!reqmsg) {
OBD_FREE(reqcopy, sizeof *reqcopy);
RETURN(-ENOMEM);
class_export_put(reqcopy->rq_export);
out:
sptlrpc_svc_ctx_decref(reqcopy);
- OBD_FREE(reqmsg, req->rq_reqlen);
+ OBD_FREE_LARGE(reqmsg, req->rq_reqlen);
OBD_FREE(reqcopy, sizeof *reqcopy);
RETURN(rc);
}
*/
static int ptlrpc_server_allow_normal(struct ptlrpc_service *svc, int force)
{
+#ifndef __KERNEL__
+ if (1) /* always allow to handle normal request for liblustre */
+ return 1;
+#endif
if (force ||
svc->srv_n_active_reqs < svc->srv_threads_running - 2)
return 1;
svc->srv_n_queued_reqs--;
/* Consider this still a "queued" request as far as stats are
concerned */
+ /* ptlrpc_hpreq_init() inserts it to the export list and by the time
+ * of ptlrpc_server_request_add() it could be already handled and
+ * released. To not lose request in between, take an extra reference
+ * on the request. */
+ ptlrpc_request_addref(req);
cfs_spin_unlock(&svc->srv_lock);
/* go through security check/transform */
}
if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_DROP_REQ_OPC) &&
- lustre_msg_get_opc(req->rq_reqmsg) == obd_fail_val) {
+ lustre_msg_get_opc(req->rq_reqmsg) == cfs_fail_val) {
CERROR("drop incoming rpc opc %u, x"LPU64"\n",
- obd_fail_val, req->rq_xid);
+ cfs_fail_val, req->rq_xid);
goto err_req;
}
if (rc)
GOTO(err_req, rc);
cfs_waitq_signal(&svc->srv_waitq);
+ ptlrpc_server_drop_request(req);
RETURN(1);
err_req:
+ ptlrpc_server_drop_request(req);
cfs_spin_lock(&svc->srv_rq_lock);
svc->srv_n_active_reqs++;
cfs_spin_unlock(&svc->srv_rq_lock);
lustre_msg_get_opc(request->rq_reqmsg));
if (lustre_msg_get_opc(request->rq_reqmsg) != OBD_PING)
- OBD_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_PAUSE_REQ, obd_fail_val);
+ CFS_FAIL_TIMEOUT_MS(OBD_FAIL_PTLRPC_PAUSE_REQ, cfs_fail_val);
rc = svc->srv_handler(request);
return !cfs_list_empty(&svc->srv_req_in_queue);
}
+static __attribute__((__noinline__)) int
+ptlrpc_wait_event(struct ptlrpc_service *svc,
+ struct ptlrpc_thread *thread)
+{
+ /* Don't exit while there are replies to be handled */
+ struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
+ ptlrpc_retry_rqbds, svc);
+
+ lc_watchdog_disable(thread->t_watchdog);
+
+ cfs_cond_resched();
+
+ l_wait_event_exclusive_head(svc->srv_waitq,
+ ptlrpc_thread_stopping(thread) ||
+ ptlrpc_server_request_waiting(svc) ||
+ ptlrpc_server_request_pending(svc, 0) ||
+ ptlrpc_rqbd_pending(svc) ||
+ ptlrpc_at_check(svc), &lwi);
+
+ if (ptlrpc_thread_stopping(thread))
+ return -EINTR;
+
+ lc_watchdog_touch(thread->t_watchdog, CFS_GET_TIMEOUT(svc));
+
+ return 0;
+}
+
/**
* Main thread body for service threads.
* Waits in a loop waiting for new requests to process to appear.
env.le_ctx.lc_cookie = 0x6;
/* Alloc reply state structure for this one */
- OBD_ALLOC_GFP(rs, svc->srv_max_reply_size, CFS_ALLOC_STD);
+ OBD_ALLOC_LARGE(rs, svc->srv_max_reply_size);
if (!rs) {
rc = -ENOMEM;
goto out_srv_fini;
/* XXX maintain a list of all managed devices: insert here */
while (!ptlrpc_thread_stopping(thread)) {
- /* Don't exit while there are replies to be handled */
- struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
- ptlrpc_retry_rqbds, svc);
-
- lc_watchdog_disable(thread->t_watchdog);
-
- cfs_cond_resched();
-
- l_wait_event_exclusive(svc->srv_waitq,
- ptlrpc_thread_stopping(thread) ||
- ptlrpc_server_request_waiting(svc) ||
- ptlrpc_server_request_pending(svc, 0) ||
- ptlrpc_rqbd_pending(svc) ||
- ptlrpc_at_check(svc), &lwi);
-
- if (ptlrpc_thread_stopping(thread))
+ if (ptlrpc_wait_event(svc, thread))
break;
- lc_watchdog_touch(thread->t_watchdog, CFS_GET_TIMEOUT(svc));
-
ptlrpc_check_rqbd_pool(svc);
if (ptlrpc_threads_need_create(svc)) {
while (!cfs_test_bit(HRT_STOPPING, &t->hrt_flags)) {
- l_cfs_wait_event(t->hrt_wait, hrt_dont_sleep(t, &replies));
+ l_wait_condition(t->hrt_wait, hrt_dont_sleep(t, &replies));
while (!cfs_list_empty(&replies)) {
struct ptlrpc_reply_state *rs;
args.cpu_index = cpu;
args.hrs = hr;
- rc = cfs_kernel_thread(ptlrpc_hr_main, (void*)&args,
- CLONE_VM|CLONE_FILES);
+ rc = cfs_create_thread(ptlrpc_hr_main, (void*)&args, CFS_DAEMON_FLAGS);
if (rc < 0) {
cfs_complete(&t->hrt_completion);
GOTO(out, rc);
}
- l_cfs_wait_event(t->hrt_wait, cfs_test_bit(HRT_RUNNING, &t->hrt_flags));
+ l_wait_condition(t->hrt_wait, cfs_test_bit(HRT_RUNNING, &t->hrt_flags));
RETURN(0);
out:
return rc;
/* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
* just drop the VM and FILES in cfs_daemonize_ctxt() right away.
*/
- rc = cfs_kernel_thread(ptlrpc_main, &d, CLONE_VM | CLONE_FILES);
+ rc = cfs_create_thread(ptlrpc_main, &d, CFS_DAEMON_FLAGS);
if (rc < 0) {
CERROR("cannot start thread '%s': rc %d\n", name, rc);
req = ptlrpc_server_request_get(service, 1);
cfs_list_del(&req->rq_list);
- service->srv_n_queued_reqs--;
service->srv_n_active_reqs++;
ptlrpc_hpreq_fini(req);
ptlrpc_server_finish_request(service, req);
cfs_list_for_each_entry_safe(rs, t, &service->srv_free_rs_list,
rs_list) {
cfs_list_del(&rs->rs_list);
- OBD_FREE(rs, service->srv_max_reply_size);
+ OBD_FREE_LARGE(rs, service->srv_max_reply_size);
}
/* In case somebody rearmed this in the meantime */