Whamcloud - gitweb
- make HEAD from b_post_cmd3
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index c1bca21..f785ede 100644 (file)
@@ -31,6 +31,7 @@
 #include <obd_support.h>
 #include <obd_class.h>
 #include <lustre_net.h>
+#include <lu_object.h>
 #include <lnet/types.h>
 #include "ptlrpc_internal.h"
 
@@ -71,7 +72,7 @@ ptlrpc_alloc_rqbd (struct ptlrpc_service *svc)
 {
         struct ptlrpc_request_buffer_desc *rqbd;
 
-        OBD_ALLOC(rqbd, sizeof (*rqbd));
+        OBD_ALLOC_PTR(rqbd);
         if (rqbd == NULL)
                 return (NULL);
 
@@ -83,7 +84,7 @@ ptlrpc_alloc_rqbd (struct ptlrpc_service *svc)
         rqbd->rqbd_buffer = ptlrpc_alloc_request_buffer(svc->srv_buf_size);
 
         if (rqbd->rqbd_buffer == NULL) {
-                OBD_FREE(rqbd, sizeof (*rqbd));
+                OBD_FREE_PTR(rqbd);
                 return (NULL);
         }
 
@@ -109,7 +110,7 @@ ptlrpc_free_rqbd (struct ptlrpc_request_buffer_desc *rqbd)
         spin_unlock(&svc->srv_lock);
 
         ptlrpc_free_request_buffer (rqbd->rqbd_buffer, svc->srv_buf_size);
-        OBD_FREE (rqbd, sizeof (*rqbd));
+        OBD_FREE_PTR(rqbd);
 }
 
 int
@@ -251,23 +252,41 @@ ptlrpc_server_post_idle_rqbds (struct ptlrpc_service *svc)
         return (-1);
 }
 
+struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
+                                            svc_handler_t h, char *name,
+                                            struct proc_dir_entry *proc_entry,
+                                            svcreq_printfn_t prntfn,
+                                            char *threadname)
+{
+        return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize,
+                               c->psc_max_req_size, c->psc_max_reply_size,
+                               c->psc_req_portal, c->psc_rep_portal,
+                               c->psc_watchdog_timeout,
+                               h, name, proc_entry,
+                               prntfn, c->psc_min_threads, c->psc_max_threads,
+                               threadname, c->psc_ctx_tags);
+}
+EXPORT_SYMBOL(ptlrpc_init_svc_conf);
+
 /* @threadname should be 11 characters or less - 3 will be added on */
 struct ptlrpc_service *
 ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
                 int req_portal, int rep_portal, int watchdog_timeout,
                 svc_handler_t handler, char *name,
                 cfs_proc_dir_entry_t *proc_entry,
-                svcreq_printfn_t svcreq_printfn, 
-                int min_threads, int max_threads, char *threadname)
+                svcreq_printfn_t svcreq_printfn,
+                int min_threads, int max_threads,
+                char *threadname, __u32 ctx_tags)
 {
         int                    rc;
         struct ptlrpc_service *service;
         ENTRY;
 
         LASSERT (nbufs > 0);
-        LASSERT (bufsize >= max_req_size);
-        
-        OBD_ALLOC(service, sizeof(*service));
+        LASSERT (bufsize >= max_req_size + SPTLRPC_MAX_PAYLOAD);
+        LASSERT (ctx_tags != 0);
+
+        OBD_ALLOC_PTR(service);
         if (service == NULL)
                 RETURN(NULL);
 
@@ -279,7 +298,7 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
         cfs_waitq_init(&service->srv_waitq);
 
         service->srv_nbuf_per_group = test_req_buffer_pressure ? 1 : nbufs;
-        service->srv_max_req_size = max_req_size;
+        service->srv_max_req_size = max_req_size + SPTLRPC_MAX_PAYLOAD;
         service->srv_buf_size = bufsize;
         service->srv_rep_portal = rep_portal;
         service->srv_req_portal = req_portal;
@@ -291,6 +310,7 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
         service->srv_threads_min = min_threads;
         service->srv_threads_max = max_threads;
         service->srv_thread_name = threadname;
+        service->srv_ctx_tags = ctx_tags;
 
         rc = LNetSetLazyPortal(service->srv_req_portal);
         LASSERT (rc == 0);
@@ -308,7 +328,7 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
         spin_lock (&ptlrpc_all_services_lock);
         list_add (&service->srv_list, &ptlrpc_all_services);
         spin_unlock (&ptlrpc_all_services_lock);
-        
+
         /* Now allocate the request buffers */
         rc = ptlrpc_grow_req_bufs(service);
         /* We shouldn't be under memory pressure at startup, so
@@ -319,7 +339,8 @@ ptlrpc_init_svc(int nbufs, int bufsize, int max_req_size, int max_reply_size,
         /* Now allocate pool of reply buffers */
         /* Increase max reply size to next power of two */
         service->srv_max_reply_size = 1;
-        while (service->srv_max_reply_size < max_reply_size)
+        while (service->srv_max_reply_size <
+               max_reply_size + SPTLRPC_MAX_PAYLOAD)
                 service->srv_max_reply_size <<= 1;
 
         if (proc_entry != NULL)
@@ -345,6 +366,8 @@ static void __ptlrpc_server_free_request(struct ptlrpc_request *req)
                 req->rq_reply_state = NULL;
         }
 
+        sptlrpc_svc_ctx_decref(req);
+
         if (req != &rqbd->rqbd_req) {
                 /* NB request buffers use an embedded
                  * req if the incoming req unlinked the
@@ -444,9 +467,9 @@ static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
         exp->exp_last_request_time = max(exp->exp_last_request_time,
                                          (time_t)CURRENT_SECONDS + extra_delay);
 
-        CDEBUG(D_INFO, "updating export %s at %ld\n",
+        CDEBUG(D_HA, "updating export %s at %ld exp %p\n",
                exp->exp_client_uuid.uuid,
-               exp->exp_last_request_time);
+               exp->exp_last_request_time, exp);
 
         /* exports may get disconnected from the chain even though the
            export has references, so we must keep the spin lock while
@@ -503,6 +526,26 @@ static void ptlrpc_update_export_timer(struct obd_export *exp, long extra_delay)
         EXIT;
 }
 
+#ifndef __KERNEL__
+int lu_context_init(struct lu_context *ctx, __u32 tags)
+{
+        return 0;
+}
+
+void lu_context_fini(struct lu_context *ctx)
+{
+}
+
+void lu_context_enter(struct lu_context *ctx)
+{
+}
+
+void lu_context_exit(struct lu_context *ctx)
+{
+}
+
+#endif
+
 static int
 ptlrpc_server_handle_request(struct ptlrpc_service *svc,
                              struct ptlrpc_thread *thread)
@@ -518,9 +561,9 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
         LASSERT(svc);
 
         spin_lock(&svc->srv_lock);
-        if (list_empty (&svc->srv_request_queue) ||
-            (svc->srv_n_difficult_replies != 0 &&
-             svc->srv_n_active_reqs >= (svc->srv_threads_running - 1))) {
+        if (unlikely(list_empty (&svc->srv_request_queue) ||
+                     (svc->srv_n_difficult_replies != 0 &&
+                      svc->srv_n_active_reqs >= (svc->srv_threads_running - 1)))) {
                 /* If all the other threads are handling requests, I must
                  * remain free to handle any 'difficult' reply that might
                  * block them */
@@ -538,7 +581,7 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
 
         do_gettimeofday(&work_start);
         timediff = cfs_timeval_sub(&work_start, &request->rq_arrival_time,NULL);
-        if (svc->srv_stats != NULL) {
+        if (likely(svc->srv_stats != NULL)) {
                 lprocfs_counter_add(svc->srv_stats, PTLRPC_REQWAIT_CNTR,
                                     timediff);
                 lprocfs_counter_add(svc->srv_stats, PTLRPC_REQQDEPTH_CNTR,
@@ -547,6 +590,23 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
                                     svc->srv_n_active_reqs);
         }
 
+        /* go through security check/transform */
+        request->rq_auth_uid = INVALID_UID;
+        request->rq_auth_mapped_uid = INVALID_UID;
+
+        rc = sptlrpc_svc_unwrap_request(request);
+        switch (rc) {
+        case SECSVC_OK:
+                break;
+        case SECSVC_COMPLETE:
+                target_send_reply(request, 0, OBD_FAIL_MDS_ALL_REPLY_NET);
+                goto put_conn;
+        case SECSVC_DROP:
+                goto out_req;
+        default:
+                LBUG();
+        }
+
 #if SWAB_PARANOIA
         /* Clear request swab mask; this is a new request */
         request->rq_req_swab_mask = 0;
@@ -575,23 +635,34 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
                 goto out_req;
         }
 
+        rc = lu_context_init(&request->rq_session, LCT_SESSION);
+        if (rc) {
+                CERROR("Failure to initialize session: %d\n", rc);
+                goto out_req;
+        }
+        request->rq_session.lc_thread = thread;
+        lu_context_enter(&request->rq_session);
+
         CDEBUG(D_NET, "got req "LPD64"\n", request->rq_xid);
 
         request->rq_svc_thread = thread;
+        if (thread)
+                request->rq_svc_thread->t_env->le_ses = &request->rq_session;
+
         request->rq_export = class_conn2export(
                                      lustre_msg_get_handle(request->rq_reqmsg));
 
-        if (request->rq_export) {
-                if (lustre_msg_get_conn_cnt(request->rq_reqmsg) <
-                    request->rq_export->exp_conn_cnt) {
+        if (likely(request->rq_export)) {
+                if (unlikely(lustre_msg_get_conn_cnt(request->rq_reqmsg) <
+                             request->rq_export->exp_conn_cnt)) {
                         DEBUG_REQ(D_ERROR, request,
                                   "DROPPING req from old connection %d < %d",
                                   lustre_msg_get_conn_cnt(request->rq_reqmsg),
                                   request->rq_export->exp_conn_cnt);
                         goto put_conn;
                 }
-                if (request->rq_export->exp_obd &&
-                    request->rq_export->exp_obd->obd_fail) {
+                if (unlikely(request->rq_export->exp_obd &&
+                             request->rq_export->exp_obd->obd_fail)) {
                         /* Failing over, don't handle any more reqs, send
                            error response instead. */
                         CDEBUG(D_HA, "Dropping req %p for failed obd %s\n",
@@ -600,7 +671,6 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
                         ptlrpc_error(request);
                         goto put_conn;
                 }
-
                 ptlrpc_update_export_timer(request->rq_export, timediff/500000);
                 export = class_export_rpc_get(request->rq_export);
         }
@@ -608,7 +678,7 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
         /* Discard requests queued for longer than my timeout.  If the
          * client's timeout is similar to mine, she'll be timing out this
          * REQ anyway (bug 1502) */
-        if (timediff / 1000000 > (long)obd_timeout) {
+        if (unlikely(timediff / 1000000 > (long)obd_timeout)) {
                 CERROR("Dropping timed-out opc %d request from %s"
                        ": %ld seconds old\n",
                        lustre_msg_get_opc(request->rq_reqmsg),
@@ -646,37 +716,44 @@ ptlrpc_server_handle_request(struct ptlrpc_service *svc,
 put_rpc_export:
         if (export != NULL)
                 class_export_rpc_put(export);
-
 put_conn:
-        if (request->rq_export != NULL)
+        if (likely(request->rq_export != NULL))
                 class_export_put(request->rq_export);
 
+        lu_context_exit(&request->rq_session);
+        lu_context_fini(&request->rq_session);
+
         reply = request->rq_reply_state && request->rq_repmsg;  /* bug 11169 */
 
         do_gettimeofday(&work_end);
+
         timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
-        if (timediff / 1000000 > (long)obd_timeout)
+
+        if (unlikely(timediff / 1000000 > (long)obd_timeout))
                 CERROR("request "LPU64" opc %u from %s processed in %lds "
                        "trans "LPU64" rc %d/%d\n",
-                       request->rq_xid, lustre_msg_get_opc(request->rq_reqmsg),
+                       request->rq_xid,
+                       request->rq_reqmsg ?
+                                lustre_msg_get_opc(request->rq_reqmsg) : 0,
                        libcfs_id2str(request->rq_peer),
                        cfs_timeval_sub(&work_end, &request->rq_arrival_time,
                                        NULL) / 1000000,
                        reply ? lustre_msg_get_transno(request->rq_repmsg) :
-                               request->rq_transno,
-                       request->rq_status,
-                       reply ? lustre_msg_get_status(request->rq_repmsg): -999);
+                               request->rq_transno, request->rq_status,
+                       reply ? lustre_msg_get_status(request->rq_repmsg) : -999);
         else
                 CDEBUG(D_HA, "request "LPU64" opc %u from %s processed in "
                        "%ldus (%ldus total) trans "LPU64" rc %d/%d\n",
-                       request->rq_xid, lustre_msg_get_opc(request->rq_reqmsg),
+                       request->rq_xid,
+                       request->rq_reqmsg ?
+                                lustre_msg_get_opc(request->rq_reqmsg) : 0,
                        libcfs_id2str(request->rq_peer), timediff,
                        cfs_timeval_sub(&work_end, &request->rq_arrival_time,
                                        NULL),
                        request->rq_transno, request->rq_status,
-                       reply ? lustre_msg_get_status(request->rq_repmsg): -999);
+                       reply ? lustre_msg_get_status(request->rq_repmsg) : -999);
 
-        if (svc->srv_stats != NULL) {
+        if (likely(svc->srv_stats != NULL && request->rq_reqmsg != NULL)) {
                 int opc = opcode_offset(lustre_msg_get_opc(request->rq_reqmsg));
                 if (opc > 0) {
                         LASSERT(opc < LUSTRE_MAX_OPCODES);
@@ -882,12 +959,13 @@ static int ptlrpc_main(void *arg)
 #ifdef WITH_GROUP_INFO
         struct group_info *ginfo = NULL;
 #endif
+        struct lu_env env;
         int rc = 0;
         ENTRY;
 
         ptlrpc_daemonize(data->name);
 
-#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,9) && defined(CONFIG_NUMA)
+#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,9) && defined CONFIG_NUMA
         /* we need to do this before any per-thread allocation is done so that
          * we get the per-thread allocations on local node.  bug 7342 */
         if (svc->srv_cpu_affinity) {
@@ -921,11 +999,18 @@ static int ptlrpc_main(void *arg)
                         goto out;
         }
 
+        rc = lu_context_init(&env.le_ctx, svc->srv_ctx_tags);
+        if (rc)
+                goto out_srv_fini;
+
+        thread->t_env = &env;
+        env.le_ctx.lc_thread = thread;
+
         /* Alloc reply state structure for this one */
         OBD_ALLOC_GFP(rs, svc->srv_max_reply_size, CFS_ALLOC_STD);
         if (!rs) {
                 rc = -ENOMEM;
-                goto out_srv_init;
+                goto out_srv_fini;
         }
 
         /* Record that the thread is running */
@@ -944,8 +1029,8 @@ static int ptlrpc_main(void *arg)
         spin_unlock(&svc->srv_lock);
         cfs_waitq_signal(&svc->srv_free_rs_waitq);
 
-        CDEBUG(D_NET, "service thread %d (#%d) started\n", thread->t_id,
-               svc->srv_threads_running);
+        CDEBUG(D_NET, "service thread %d (#%d)started\n", thread->t_id,
+              svc->srv_threads_running);
 
         /* XXX maintain a list of all managed devices: insert here */
 
@@ -957,6 +1042,8 @@ static int ptlrpc_main(void *arg)
 
                 lc_watchdog_disable(watchdog);
 
+                cond_resched();
+
                 l_wait_event_exclusive (svc->srv_waitq,
                               ((thread->t_flags & SVC_STOPPING) != 0 &&
                                svc->srv_n_difficult_replies == 0) ||
@@ -987,8 +1074,11 @@ static int ptlrpc_main(void *arg)
                  * requests */
                 if (!list_empty (&svc->srv_request_queue) &&
                     (svc->srv_n_difficult_replies == 0 ||
-                     svc->srv_n_active_reqs < (svc->srv_threads_running - 1)))
+                     svc->srv_n_active_reqs < (svc->srv_threads_running - 1))) {
+                        lu_context_enter(&env.le_ctx);
                         ptlrpc_server_handle_request(svc, thread);
+                        lu_context_exit(&env.le_ctx);
+                }
 
                 if (!list_empty(&svc->srv_idle_rqbds) &&
                     ptlrpc_server_post_idle_rqbds(svc) < 0) {
@@ -1003,18 +1093,19 @@ static int ptlrpc_main(void *arg)
 
         lc_watchdog_delete(watchdog);
 
-out_srv_init:
+out_srv_fini:
         /*
          * deconstruct service specific state created by ptlrpc_start_thread()
          */
         if (svc->srv_done != NULL)
                 svc->srv_done(thread);
 
+        lu_env_fini(&env);
 out:
         CDEBUG(D_NET, "service thread %d exiting: rc %d\n", thread->t_id, rc);
 
         spin_lock(&svc->srv_lock);
-        svc->srv_threads_running--;              /* must know immediately */
+        svc->srv_threads_running--; /* must know immediately */
         thread->t_id = rc;
         thread->t_flags = SVC_STOPPED;
 
@@ -1041,7 +1132,7 @@ static void ptlrpc_stop_thread(struct ptlrpc_service *svc,
         list_del(&thread->t_link);
         spin_unlock(&svc->srv_lock);
 
-        OBD_FREE(thread, sizeof(*thread));
+        OBD_FREE_PTR(thread);
 }
 
 void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
@@ -1070,7 +1161,7 @@ int ptlrpc_start_threads(struct obd_device *dev, struct ptlrpc_service *svc)
         for (i = 0; i < svc->srv_threads_min; i++) {
                 rc = ptlrpc_start_thread(dev, svc);
                 if (rc) {
-                        CERROR("cannot start %s thread #%d: rc %d\n",
+                        CERROR("cannot start %s thread #%d: rc %d\n", 
                                svc->srv_thread_name, i, rc);
                         ptlrpc_stop_all_threads(svc);
                 }
@@ -1090,10 +1181,10 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc)
         CDEBUG(D_RPCTRACE, "%s started %d min %d max %d running %d\n",
                svc->srv_name, svc->srv_threads_started, svc->srv_threads_min,
                svc->srv_threads_max, svc->srv_threads_running);
-        if (svc->srv_threads_started >= svc->srv_threads_max) 
+        if (svc->srv_threads_started >= svc->srv_threads_max)
                 RETURN(-EMFILE);
 
-        OBD_ALLOC(thread, sizeof(*thread));
+        OBD_ALLOC_PTR(thread);
         if (thread == NULL)
                 RETURN(-ENOMEM);
         cfs_waitq_init(&thread->t_ctl_waitq);
@@ -1101,11 +1192,11 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc)
         spin_lock(&svc->srv_lock);
         if (svc->srv_threads_started >= svc->srv_threads_max) {
                 spin_unlock(&svc->srv_lock);
-                OBD_FREE(thread, sizeof(*thread));
+                OBD_FREE_PTR(thread);
                 RETURN(-EMFILE);
         }
         list_add(&thread->t_link, &svc->srv_threads);
-        id = ++svc->srv_threads_started;
+        id = svc->srv_threads_started++;
         spin_unlock(&svc->srv_lock);
 
         thread->t_id = id;
@@ -1117,7 +1208,7 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc)
 
         CDEBUG(D_RPCTRACE, "starting thread '%s'\n", name);
         
-        /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
+          /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
          * just drop the VM and FILES in ptlrpc_daemonize() right away.
          */
         rc = cfs_kernel_thread(ptlrpc_main, &d, CLONE_VM | CLONE_FILES);
@@ -1169,7 +1260,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
          * its 'unlink' flag set for each posted rqbd */
         list_for_each(tmp, &service->srv_active_rqbds) {
                 struct ptlrpc_request_buffer_desc *rqbd =
-                        list_entry(tmp, struct ptlrpc_request_buffer_desc, 
+                        list_entry(tmp, struct ptlrpc_request_buffer_desc,
                                    rqbd_list);
 
                 rc = LNetMDUnlink(rqbd->rqbd_md_h);
@@ -1259,7 +1350,7 @@ int ptlrpc_unregister_service(struct ptlrpc_service *service)
                 OBD_FREE(rs, service->srv_max_reply_size);
         }
 
-        OBD_FREE(service, sizeof(*service));
+        OBD_FREE_PTR(service);
         return 0;
 }