Whamcloud - gitweb
* refine local rpc code
authorniu <niu>
Thu, 15 Jan 2004 03:54:12 +0000 (03:54 +0000)
committerniu <niu>
Thu, 15 Jan 2004 03:54:12 +0000 (03:54 +0000)
* update from HEAD

lustre/ptlrpc/ptlrpc_local.c

index bf72175..99b618d 100644 (file)
 #include <portals/types.h>
 #include "ptlrpc_internal.h"
 
-
-struct ptlrpc_local_cs *ost_local_svc = NULL;
-struct ptlrpc_local_cs *mdt_local_svc = NULL;
-
 static int local_handle(struct ptlrpc_request *req, svc_handler_t handler)
 {
         int rc = 0;
         unsigned long flags;
         ENTRY;
-/*
-        SIGNAL_MASK_LOCK(current, flags);
-        sigfillset(&current->blocked);
-        RECALC_SIGPENDING;
-        SIGNAL_MASK_UNLOCK(current, flags);
-*/
+
         req->rq_export = class_conn2export(&req->rq_reqmsg->handle);
         if (req->rq_export) {
                 req->rq_connection = req->rq_export->exp_connection;
@@ -68,10 +59,11 @@ static int local_handle(struct ptlrpc_request *req, svc_handler_t handler)
                 req->rq_export->exp_last_request_time =
                         LTIME_S(CURRENT_TIME);
         } else {
+                /* connection must has been established */
+                LBUG();
                 /* create a (hopefully temporary) connection that will be used
                  * to send the reply if this call doesn't create an export.
                  * XXX revisit this when we revamp ptlrpc */
-                LBUG();
                 req->rq_connection =
                         ptlrpc_get_connection(&req->rq_peer, NULL);
         }
@@ -101,97 +93,10 @@ out_putconn:
 
 }
 
-static int localrpc_main(void *arg)
-{
-        struct ptlrpc_local_cs *svc = arg;
-        struct ptlrpc_thread *thread = svc->svc_thread;
-        svc_handler_t handler = svc->srv_handler;
-        unsigned long flags;
-        int rc;
-        
-        lock_kernel();
-        ptlrpc_daemonize();
-
-        SIGNAL_MASK_LOCK(current, flags);
-        sigfillset(&current->blocked);
-        RECALC_SIGPENDING;
-        SIGNAL_MASK_UNLOCK(current, flags);
-
-        THREAD_NAME(current->comm, "%s", "localrpc");
-        unlock_kernel();
-
-        thread->t_flags = SVC_RUNNING;
-        wake_up(&thread->t_ctl_waitq);
-
-        while (1) {
-                struct ptlrpc_request *req;
-                struct l_wait_info lwi = { 0 };
-                
-                l_wait_event(svc->req_waitq, 
-                             !list_empty(&svc->req_list) || 
-                             thread->t_flags & SVC_STOPPING, 
-                             &lwi);
-
-                if (thread->t_flags & SVC_STOPPING) {
-                        struct list_head *tmp, *n;
-                        list_for_each_safe(tmp, n, &svc->req_list) {
-                                req = list_entry(tmp, struct ptlrpc_request, 
-                                                 rq_list);
-                                list_del_init(&req->rq_list);
-                        }
-                        break;
-                }
-                
-                spin_lock_irqsave(&svc->req_lock, flags);
-                req = list_entry(svc->req_list.next, struct ptlrpc_request, rq_list);
-                list_del_init(&req->rq_list);
-                spin_unlock_irqrestore(&svc->req_lock, flags);
-
-                rc = local_handle(req, handler);
-                wake_up(&req->rq_reply_waitq);
-        
-        }
-        thread->t_flags = SVC_STOPPED;
-        wake_up(&thread->t_ctl_waitq);
-        RETURN(0);
-
-}       
-
-static int localrpc_start_thread(struct ptlrpc_local_cs *svc)
-{
-        struct l_wait_info lwi = { 0 };
-        struct ptlrpc_thread *thread;
-        int rc;
-        ENTRY;
-
-        OBD_ALLOC(thread, sizeof(*thread));
-        if (thread == NULL) 
-                RETURN(-ENOMEM);
-        init_waitqueue_head(&thread->t_ctl_waitq);
-
-        svc->svc_thread = thread;
-
-        /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
-         * just drop the VM and FILES in ptlrpc_daemonize() right away.
-         */
-        rc = kernel_thread(localrpc_main, svc, CLONE_VM | CLONE_FILES);
-        if (rc < 0) {
-                CERROR("cannot start thread: %d\n", rc);
-                GOTO(out_free, rc);
-        }
-        l_wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_RUNNING, &lwi);
-        
-        RETURN(0);
-out_free:
-        OBD_FREE(thread, sizeof(*thread));
-        RETURN(rc);
-}
-
-static int local_svc_available(struct ptlrpc_request *req)
+int local_svc_available(struct ptlrpc_request *req)
 {
         int req_portal = req->rq_request_portal;
-        int req_opc = req->rq_reqmsg->opc;
-        int av_portal = 0, av_opc = 1;
+        int av_portal = 0;
 
         av_portal =  (req_portal == MDS_REQUEST_PORTAL ||
                       req_portal == MDS_SETATTR_PORTAL ||
@@ -199,161 +104,55 @@ static int local_svc_available(struct ptlrpc_request *req)
                       req_portal == OST_REQUEST_PORTAL ||
                       req_portal == OST_CREATE_PORTAL) ? 1 : 0;
         
-        /* XXX For debug: only LDLM_ENQUEUE available for local rpc */
-        av_opc = (req_opc == LDLM_ENQUEUE) ? 1 : 0;
-        
-        return av_portal & av_opc;
-
-}
-
-static int _local_rpc_init(struct ptlrpc_local_cs **local_svc, 
-                           svc_handler_t handler)
-{
-        int rc = 0;
-        struct ptlrpc_local_cs *svc = NULL;
-        
-        OBD_ALLOC(svc, sizeof(*svc));
-        if (svc == NULL)
-                RETURN( -ENOMEM);
-        svc->srv_handler = handler;
-        INIT_LIST_HEAD(&svc->req_list);
-        spin_lock_init(&svc->req_lock);
-        init_waitqueue_head(&svc->req_waitq);
-        
-        rc = localrpc_start_thread(svc);
-        if (rc) {
-                OBD_FREE(svc, sizeof(*svc));
-                RETURN(rc);
-        }
-
-        *local_svc = svc;
-        RETURN(rc);
-}
-
-int local_rpc_init(char *type, struct ptlrpc_local_cs **svc, 
-                   svc_handler_t handler)
-{
-        int rc = 0;
-
-        if (strcmp(type, "ost") == 0) {
-                if (ost_local_svc == NULL)
-                        rc = _local_rpc_init(&ost_local_svc, handler);
-                *svc = ost_local_svc;
-        
-        } else if (strcmp(type, "mdt") == 0) {
-                if (mdt_local_svc == NULL)
-                        rc = _local_rpc_init(&mdt_local_svc, handler);
-                *svc = mdt_local_svc;
-        } else {
-                LBUG();
-        }
-
-        RETURN(rc);
-}
-
-static void _local_rpc_cleanup(struct ptlrpc_local_cs *svc)
-{
-        unsigned long flags;
-        struct l_wait_info lw = { 0 };
-        
-        spin_lock_irqsave(&svc->req_lock, flags);
-        svc->svc_thread->t_flags = SVC_STOPPING;
-        spin_unlock_irqrestore(&svc->req_lock, flags);
-        wake_up(&svc->req_waitq);
-        
-        l_wait_event(svc->svc_thread->t_ctl_waitq,
-                     svc->svc_thread->t_flags & SVC_STOPPED,
-                     &lw);
-        
-        OBD_FREE(svc->svc_thread, sizeof(*svc->svc_thread));
-        OBD_FREE(svc, sizeof(*svc));
-}
+        return av_portal;
 
-void local_rpc_cleanup(char *type)
-{
-        if (strcmp(type, "ost") == 0 && ost_local_svc != NULL) {
-                _local_rpc_cleanup(ost_local_svc);
-                ost_local_svc = NULL;
-        } else if (strcmp(type, "mdt") == 0 && mdt_local_svc != NULL) {
-                _local_rpc_cleanup(mdt_local_svc);
-                mdt_local_svc = NULL;
-        }
 }
 
-#define SAME_THREAD
 int local_send_rpc(struct ptlrpc_request *req)
 {
-        struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
         int rc;
         unsigned long flags;
-        struct l_wait_info lwi = { 0 };
         struct obd_ucred ucred;
         void *journal_info;
-        int ngroups;
+        struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
         ENTRY;
         
-        /* XXX tight restriction for debug */
-        if (local_svc_available(req) && cli->local_rpc != NULL &&
-            req->rq_bulk == NULL) {
-                struct ptlrpc_local_cs *svc = cli->local_rpc;
+        LASSERT (req->rq_replen != 0);
+        req->rq_reqmsg->handle = req->rq_import->imp_remote_handle;
+        req->rq_reqmsg->conn_cnt = req->rq_import->imp_conn_cnt;
                 
-                req->rq_reqmsg->handle = req->rq_import->imp_remote_handle;
-                req->rq_reqmsg->conn_cnt = req->rq_import->imp_conn_cnt;
-                
-                LASSERT (req->rq_replen != 0);
-               
-                spin_lock_irqsave (&req->rq_lock, flags);
-                req->rq_receiving_reply = 1;
-                req->rq_replied = 0;
-                req->rq_err = 0;
-                req->rq_timedout = 0;
-                req->rq_resend = 0;
-                req->rq_restart = 0;
-                spin_unlock_irqrestore (&request->rq_lock, flags);
-                
-                req->rq_sent = LTIME_S(CURRENT_TIME);
-//              ptlrpc_pinger_sending_on_import(req->rq_import);
-                req->rq_type = PTL_LOCAL_MSG_REQUEST;
-
-#ifndef SAME_THREAD        
-                spin_lock_irqsave(&svc->req_lock, flags);
-                list_del_init(&req->rq_list);
-                list_add_tail(&req->rq_list, &svc->req_list);
-                spin_unlock_irqrestore(&svc->req_lock, flags);
-                wake_up(&svc->req_waitq);
-                
-                l_wait_event(req->rq_reply_waitq, req->rq_replied, &lwi);
-#else
-
-/*
-                // for nesting journal
-                journal_info = current->journal_info;
-                current->journal_info = NULL;
-*/                
-                ucred.ouc_fsuid = current->fsuid;
-                ucred.ouc_fsgid = current->fsgid;
-                ucred.ouc_cap = current->cap_effective;
-                ngroups = current->ngroups;
-                current->fsuid = current->fsgid = 0;
-                current->ngroups = 0;
-                current->cap_effective = -1;
+        spin_lock_irqsave (&req->rq_lock, flags);
+        req->rq_receiving_reply = 1;
+        req->rq_replied = 0;
+        req->rq_err = 0;
+        req->rq_timedout = 0;
+        req->rq_resend = 0;
+        req->rq_restart = 0;
+        spin_unlock_irqrestore (&request->rq_lock, flags);
                 
-                rc = local_handle(req, svc->srv_handler);
+        req->rq_sent = LTIME_S(CURRENT_TIME);
+        ptlrpc_pinger_sending_on_import(req->rq_import);
+        req->rq_type = PTL_LOCAL_MSG_REQUEST;
+        
+        ucred.ouc_fsuid = current->fsuid;
+        ucred.ouc_fsgid = current->fsgid;
+        ucred.ouc_cap = current->cap_effective;
+        current->fsuid = current->fsgid = 0;
+        current->cap_effective = -1;
+        
+        /* for nesting journal on MDS and OST*/
+        journal_info = current->journal_info;
+        current->journal_info = NULL;
+        
+        rc = local_handle(req, cli->srv_handler);
+        
+        current->journal_info = journal_info;
                 
-                current->fsuid = ucred.ouc_fsuid;
-                current->fsgid = ucred.ouc_fsgid;
-                current->cap_effective = ucred.ouc_cap;
-                current->ngroups = ngroups;
+        current->fsuid = ucred.ouc_fsuid;
+        current->fsgid = ucred.ouc_fsgid;
+        current->cap_effective = ucred.ouc_cap;
                 
-//              current->journal_info = journal_info;
-#endif //SAME_THREAD
-                ptlrpc_lprocfs_rpc_sent(req);
-
-        } else {
-                rc = -EOPNOTSUPP;
-        }
-
+        ptlrpc_lprocfs_rpc_sent(req);
         RETURN(rc);
 }
 
@@ -405,6 +204,7 @@ int local_bulk_move(struct ptlrpc_bulk_desc *desc)
         char *src_addr, *dst_addr;
         int len, i;
         unsigned long flags;
+        ENTRY;
 
         if (desc->bd_type == BULK_GET_SINK) {
                 source = desc->bd_req->rq_bulk;
@@ -418,7 +218,7 @@ int local_bulk_move(struct ptlrpc_bulk_desc *desc)
 
         LASSERT(source);
         LASSERT(dest);
-        /* XXX need more investigation for sparse file case */
+        /* XXX need more investigation for sparse file case */
         if (source->bd_page_count != dest->bd_page_count)
                 goto done;
 
@@ -443,12 +243,7 @@ done:
         desc->bd_network_rw = 0;
         desc->bd_complete = 1;
         spin_unlock_irqrestore(&desc->bd_lock, flags);
-/*        
-        if (desc->bd_req->rq_set != NULL)
-                wake_up (&desc->bd_req->rq_set->set_waitq);
-        else
-                wake_up (&desc->bd_req->rq_reply_waitq);
-*/
+
         RETURN(0);
         
 }