#include <obd_support.h>
#include <obd_class.h>
#include <lustre_net.h>
+#include <lu_object.h>
#include <lnet/types.h>
#include "ptlrpc_internal.h"
{
struct ptlrpc_request_buffer_desc *rqbd;
- OBD_ALLOC(rqbd, sizeof (*rqbd));
+ OBD_ALLOC_PTR(rqbd);
if (rqbd == NULL)
return (NULL);
rqbd->rqbd_buffer = ptlrpc_alloc_request_buffer(svc->srv_buf_size);
if (rqbd->rqbd_buffer == NULL) {
- OBD_FREE(rqbd, sizeof (*rqbd));
+ OBD_FREE_PTR(rqbd);
return (NULL);
}
spin_unlock(&svc->srv_lock);
ptlrpc_free_request_buffer (rqbd->rqbd_buffer, svc->srv_buf_size);
- OBD_FREE (rqbd, sizeof (*rqbd));
+ OBD_FREE_PTR(rqbd);
}
int
return (-1);
}
+struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
+ svc_handler_t h, char *name,
+ struct proc_dir_entry *proc_entry,
+ svcreq_printfn_t prntfn,
+ char *threadname)
+{
+ return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize,
+ c->psc_max_req_size, c->psc_max_reply_size,
+ c->psc_req_portal, c->psc_rep_portal,
+ c->psc_watchdog_timeout,
+ h, name, proc_entry,
+ prntfn, c->psc_min_threads, c->psc_max_threads,
+ threadname, c->psc_ctx_tags);
+}
+EXPORT_SYMBOL(ptlrpc_init_svc_conf);
+
/* @threadname should be 11 characters or less - 3 will be added on */
struct ptlrpc_service *
ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
int req_portal, int rep_portal, int watchdog_timeout,
svc_handler_t handler, char *name,
cfs_proc_dir_entry_t *proc_entry,
- svcreq_printfn_t svcreq_printfn,
- int min_threads, int max_threads, char *threadname)
+ svcreq_printfn_t svcreq_printfn,
+ int min_threads, int max_threads,
+ char *threadname, __u32 ctx_tags)
{
int rc;
struct ptlrpc_service *service;
ENTRY;
LASSERT (nbufs > 0);
- LASSERT (bufsize >= max_req_size);
-
- OBD_ALLOC(service, sizeof(*service));
+ LASSERT (bufsize >= max_req_size + SPTLRPC_MAX_PAYLOAD);
+ LASSERT (ctx_tags != 0);
+
+ OBD_ALLOC_PTR(service);
if (service == NULL)
RETURN(NULL);
cfs_waitq_init(&service->srv_waitq);
service->srv_nbuf_per_group = test_req_buffer_pressure ? 1 : nbufs;
- service->srv_max_req_size = max_req_size;
+ service->srv_max_req_size = max_req_size + SPTLRPC_MAX_PAYLOAD;
service->srv_buf_size = bufsize;
service->srv_rep_portal = rep_portal;
service->srv_req_portal = req_portal;
service->srv_threads_min = min_threads;
service->srv_threads_max = max_threads;
service->srv_thread_name = threadname;
+ service->srv_ctx_tags = ctx_tags;
rc = LNetSetLazyPortal(service->srv_req_portal);
LASSERT (rc == 0);
spin_lock (&ptlrpc_all_services_lock);
list_add (&service->srv_list, &ptlrpc_all_services);
spin_unlock (&ptlrpc_all_services_lock);
-
+
/* Now allocate the request buffers */
rc = ptlrpc_grow_req_bufs(service);
/* We shouldn't be under memory pressure at startup, so
/* Now allocate pool of reply buffers */
/* Increase max reply size to next power of two */
service->srv_max_reply_size = 1;
- while (service->srv_max_reply_size < max_reply_size)
+ while (service->srv_max_reply_size <
+ max_reply_size + SPTLRPC_MAX_PAYLOAD)
service->srv_max_reply_size <<= 1;
if (proc_entry != NULL)
req->rq_reply_state = NULL;
}
+ sptlrpc_svc_ctx_decref(req);
+
if (req != &rqbd->rqbd_req) {
/* NB request buffers use an embedded
* req if the incoming req unlinked the
exp->exp_last_request_time = max(exp->exp_last_request_time,
(time_t)CURRENT_SECONDS + extra_delay);
- CDEBUG(D_INFO, "updating export %s at %ld\n",
+ CDEBUG(D_HA, "updating export %s at %ld exp %p\n",
exp->exp_client_uuid.uuid,
- exp->exp_last_request_time);
+ exp->exp_last_request_time, exp);
/* exports may get disconnected from the chain even though the
export has references, so we must keep the spin lock while
EXIT;
}
+#ifndef __KERNEL__
+int lu_context_init(struct lu_context *ctx, __u32 tags)
+{
+ return 0;
+}
+
+void lu_context_fini(struct lu_context *ctx)
+{
+}
+
+void lu_context_enter(struct lu_context *ctx)
+{
+}
+
+void lu_context_exit(struct lu_context *ctx)
+{
+}
+
+#endif
+
static int
ptlrpc_server_handle_request(struct ptlrpc_service *svc,
struct ptlrpc_thread *thread)
LASSERT(svc);
spin_lock(&svc->srv_lock);
- if (list_empty (&svc->srv_request_queue) ||
- (svc->srv_n_difficult_replies != 0 &&
- svc->srv_n_active_reqs >= (svc->srv_threads_running - 1))) {
+ if (unlikely(list_empty (&svc->srv_request_queue) ||
+ (svc->srv_n_difficult_replies != 0 &&
+ svc->srv_n_active_reqs >= (svc->srv_threads_running - 1)))) {
/* If all the other threads are handling requests, I must
* remain free to handle any 'difficult' reply that might
* block them */
do_gettimeofday(&work_start);
timediff = cfs_timeval_sub(&work_start, &request->rq_arrival_time,NULL);
- if (svc->srv_stats != NULL) {
+ if (likely(svc->srv_stats != NULL)) {
lprocfs_counter_add(svc->srv_stats, PTLRPC_REQWAIT_CNTR,
timediff);
lprocfs_counter_add(svc->srv_stats, PTLRPC_REQQDEPTH_CNTR,
svc->srv_n_active_reqs);
}
+ /* go through security check/transform */
+ request->rq_auth_uid = INVALID_UID;
+ request->rq_auth_mapped_uid = INVALID_UID;
+
+ rc = sptlrpc_svc_unwrap_request(request);
+ switch (rc) {
+ case SECSVC_OK:
+ break;
+ case SECSVC_COMPLETE:
+ target_send_reply(request, 0, OBD_FAIL_MDS_ALL_REPLY_NET);
+ goto put_conn;
+ case SECSVC_DROP:
+ goto out_req;
+ default:
+ LBUG();
+ }
+
#if SWAB_PARANOIA
/* Clear request swab mask; this is a new request */
request->rq_req_swab_mask = 0;
goto out_req;
}
+ rc = lu_context_init(&request->rq_session, LCT_SESSION);
+ if (rc) {
+ CERROR("Failure to initialize session: %d\n", rc);
+ goto out_req;
+ }
+ request->rq_session.lc_thread = thread;
+ lu_context_enter(&request->rq_session);
+
CDEBUG(D_NET, "got req "LPD64"\n", request->rq_xid);
request->rq_svc_thread = thread;
+ if (thread)
+ request->rq_svc_thread->t_env->le_ses = &request->rq_session;
+
request->rq_export = class_conn2export(
lustre_msg_get_handle(request->rq_reqmsg));
- if (request->rq_export) {
- if (lustre_msg_get_conn_cnt(request->rq_reqmsg) <
- request->rq_export->exp_conn_cnt) {
+ if (likely(request->rq_export)) {
+ if (unlikely(lustre_msg_get_conn_cnt(request->rq_reqmsg) <
+ request->rq_export->exp_conn_cnt)) {
DEBUG_REQ(D_ERROR, request,
"DROPPING req from old connection %d < %d",
lustre_msg_get_conn_cnt(request->rq_reqmsg),
request->rq_export->exp_conn_cnt);
goto put_conn;
}
- if (request->rq_export->exp_obd &&
- request->rq_export->exp_obd->obd_fail) {
+ if (unlikely(request->rq_export->exp_obd &&
+ request->rq_export->exp_obd->obd_fail)) {
/* Failing over, don't handle any more reqs, send
error response instead. */
CDEBUG(D_HA, "Dropping req %p for failed obd %s\n",
ptlrpc_error(request);
goto put_conn;
}
-
ptlrpc_update_export_timer(request->rq_export, timediff/500000);
export = class_export_rpc_get(request->rq_export);
}
/* Discard requests queued for longer than my timeout. If the
* client's timeout is similar to mine, she'll be timing out this
* REQ anyway (bug 1502) */
- if (timediff / 1000000 > (long)obd_timeout) {
+ if (unlikely(timediff / 1000000 > (long)obd_timeout)) {
CERROR("Dropping timed-out opc %d request from %s"
": %ld seconds old\n",
lustre_msg_get_opc(request->rq_reqmsg),
put_rpc_export:
if (export != NULL)
class_export_rpc_put(export);
-
put_conn:
- if (request->rq_export != NULL)
+ if (likely(request->rq_export != NULL))
class_export_put(request->rq_export);
+ lu_context_exit(&request->rq_session);
+ lu_context_fini(&request->rq_session);
+
reply = request->rq_reply_state && request->rq_repmsg; /* bug 11169 */
do_gettimeofday(&work_end);
+
timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
- if (timediff / 1000000 > (long)obd_timeout)
+
+ if (unlikely(timediff / 1000000 > (long)obd_timeout))
CERROR("request "LPU64" opc %u from %s processed in %lds "
"trans "LPU64" rc %d/%d\n",
- request->rq_xid, lustre_msg_get_opc(request->rq_reqmsg),
+ request->rq_xid,
+ request->rq_reqmsg ?
+ lustre_msg_get_opc(request->rq_reqmsg) : 0,
libcfs_id2str(request->rq_peer),
cfs_timeval_sub(&work_end, &request->rq_arrival_time,
NULL) / 1000000,
reply ? lustre_msg_get_transno(request->rq_repmsg) :
- request->rq_transno,
- request->rq_status,
- reply ? lustre_msg_get_status(request->rq_repmsg): -999);
+ request->rq_transno, request->rq_status,
+ reply ? lustre_msg_get_status(request->rq_repmsg) : -999);
else
CDEBUG(D_HA, "request "LPU64" opc %u from %s processed in "
"%ldus (%ldus total) trans "LPU64" rc %d/%d\n",
- request->rq_xid, lustre_msg_get_opc(request->rq_reqmsg),
+ request->rq_xid,
+ request->rq_reqmsg ?
+ lustre_msg_get_opc(request->rq_reqmsg) : 0,
libcfs_id2str(request->rq_peer), timediff,
cfs_timeval_sub(&work_end, &request->rq_arrival_time,
NULL),
request->rq_transno, request->rq_status,
- reply ? lustre_msg_get_status(request->rq_repmsg): -999);
+ reply ? lustre_msg_get_status(request->rq_repmsg) : -999);
- if (svc->srv_stats != NULL) {
+ if (likely(svc->srv_stats != NULL && request->rq_reqmsg != NULL)) {
int opc = opcode_offset(lustre_msg_get_opc(request->rq_reqmsg));
if (opc > 0) {
LASSERT(opc < LUSTRE_MAX_OPCODES);
#ifdef WITH_GROUP_INFO
struct group_info *ginfo = NULL;
#endif
+ struct lu_env env;
int rc = 0;
ENTRY;
ptlrpc_daemonize(data->name);
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,9) && defined(CONFIG_NUMA)
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,9) && defined CONFIG_NUMA
/* we need to do this before any per-thread allocation is done so that
* we get the per-thread allocations on local node. bug 7342 */
if (svc->srv_cpu_affinity) {
goto out;
}
+ rc = lu_context_init(&env.le_ctx, svc->srv_ctx_tags);
+ if (rc)
+ goto out_srv_fini;
+
+ thread->t_env = &env;
+ env.le_ctx.lc_thread = thread;
+
/* Alloc reply state structure for this one */
OBD_ALLOC_GFP(rs, svc->srv_max_reply_size, CFS_ALLOC_STD);
if (!rs) {
rc = -ENOMEM;
- goto out_srv_init;
+ goto out_srv_fini;
}
/* Record that the thread is running */
spin_unlock(&svc->srv_lock);
cfs_waitq_signal(&svc->srv_free_rs_waitq);
- CDEBUG(D_NET, "service thread %d (#%d) started\n", thread->t_id,
- svc->srv_threads_running);
+ CDEBUG(D_NET, "service thread %d (#%d)started\n", thread->t_id,
+ svc->srv_threads_running);
/* XXX maintain a list of all managed devices: insert here */
lc_watchdog_disable(watchdog);
+ cond_resched();
+
l_wait_event_exclusive (svc->srv_waitq,
((thread->t_flags & SVC_STOPPING) != 0 &&
svc->srv_n_difficult_replies == 0) ||
* requests */
if (!list_empty (&svc->srv_request_queue) &&
(svc->srv_n_difficult_replies == 0 ||
- svc->srv_n_active_reqs < (svc->srv_threads_running - 1)))
+ svc->srv_n_active_reqs < (svc->srv_threads_running - 1))) {
+ lu_context_enter(&env.le_ctx);
ptlrpc_server_handle_request(svc, thread);
+ lu_context_exit(&env.le_ctx);
+ }
if (!list_empty(&svc->srv_idle_rqbds) &&
ptlrpc_server_post_idle_rqbds(svc) < 0) {
lc_watchdog_delete(watchdog);
-out_srv_init:
+out_srv_fini:
/*
* deconstruct service specific state created by ptlrpc_start_thread()
*/
if (svc->srv_done != NULL)
svc->srv_done(thread);
+ lu_env_fini(&env);
out:
CDEBUG(D_NET, "service thread %d exiting: rc %d\n", thread->t_id, rc);
spin_lock(&svc->srv_lock);
- svc->srv_threads_running--; /* must know immediately */
+ svc->srv_threads_running--; /* must know immediately */
thread->t_id = rc;
thread->t_flags = SVC_STOPPED;
list_del(&thread->t_link);
spin_unlock(&svc->srv_lock);
- OBD_FREE(thread, sizeof(*thread));
+ OBD_FREE_PTR(thread);
}
void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
for (i = 0; i < svc->srv_threads_min; i++) {
rc = ptlrpc_start_thread(dev, svc);
if (rc) {
- CERROR("cannot start %s thread #%d: rc %d\n",
+ CERROR("cannot start %s thread #%d: rc %d\n",
svc->srv_thread_name, i, rc);
ptlrpc_stop_all_threads(svc);
}
CDEBUG(D_RPCTRACE, "%s started %d min %d max %d running %d\n",
svc->srv_name, svc->srv_threads_started, svc->srv_threads_min,
svc->srv_threads_max, svc->srv_threads_running);
- if (svc->srv_threads_started >= svc->srv_threads_max)
+ if (svc->srv_threads_started >= svc->srv_threads_max)
RETURN(-EMFILE);
- OBD_ALLOC(thread, sizeof(*thread));
+ OBD_ALLOC_PTR(thread);
if (thread == NULL)
RETURN(-ENOMEM);
cfs_waitq_init(&thread->t_ctl_waitq);
spin_lock(&svc->srv_lock);
if (svc->srv_threads_started >= svc->srv_threads_max) {
spin_unlock(&svc->srv_lock);
- OBD_FREE(thread, sizeof(*thread));
+ OBD_FREE_PTR(thread);
RETURN(-EMFILE);
}
list_add(&thread->t_link, &svc->srv_threads);
- id = ++svc->srv_threads_started;
+ id = svc->srv_threads_started++;
spin_unlock(&svc->srv_lock);
thread->t_id = id;
CDEBUG(D_RPCTRACE, "starting thread '%s'\n", name);
- /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
+ /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
* just drop the VM and FILES in ptlrpc_daemonize() right away.
*/
rc = cfs_kernel_thread(ptlrpc_main, &d, CLONE_VM | CLONE_FILES);
* its 'unlink' flag set for each posted rqbd */
list_for_each(tmp, &service->srv_active_rqbds) {
struct ptlrpc_request_buffer_desc *rqbd =
- list_entry(tmp, struct ptlrpc_request_buffer_desc,
+ list_entry(tmp, struct ptlrpc_request_buffer_desc,
rqbd_list);
rc = LNetMDUnlink(rqbd->rqbd_md_h);
OBD_FREE(rs, service->srv_max_reply_size);
}
- OBD_FREE(service, sizeof(*service));
+ OBD_FREE_PTR(service);
return 0;
}