Whamcloud - gitweb
64-bit warning fixes. Someone should take a closer look at ext2_obd.c
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index 1612597..c85db78 100644 (file)
  */
 
 #define EXPORT_SYMTAB
-
-#include <linux/config.h>
-#include <linux/module.h>
-#include <linux/kernel.h>
-
 #define DEBUG_SUBSYSTEM S_RPC
 
-#include <linux/obd_support.h>
-#include <linux/obd_class.h>
 #include <linux/lustre_net.h>
 
-extern int server_request_callback(ptl_event_t *ev, void *data);
+extern int request_in_callback(ptl_event_t *ev, void *data);
 extern int ptl_handled_rpc(struct ptlrpc_service *service, void *start);
 
 static int ptlrpc_check_event(struct ptlrpc_service *svc)
 {
         int rc = 0;
+        ENTRY;
 
-        spin_lock(&svc->srv_lock); 
+        spin_lock(&svc->srv_lock);
         if (sigismember(&(current->pending.signal), SIGKILL) ||
-            sigismember(&(current->pending.signal), SIGSTOP) ||
-            sigismember(&(current->pending.signal), SIGCONT) ||
-            sigismember(&(current->pending.signal), SIGINT)) { 
+            sigismember(&(current->pending.signal), SIGTERM) ||
+            sigismember(&(current->pending.signal), SIGINT)) {
                 svc->srv_flags |= SVC_KILLED;
-                EXIT;
-                rc = 1;
-                goto out;
+                GOTO(out, rc = 1);
         }
 
-        if ( svc->srv_flags & SVC_STOPPING ) {
-                EXIT;
-                rc = 1;
-                goto out;
-        }
+        if (svc->srv_flags & SVC_STOPPING)
+                GOTO(out, rc = 1);
 
-        if (svc->srv_flags & SVC_EVENT)
-                LBUG();
-
-        if ( svc->srv_eq_h ) { 
+        if (ptl_is_valid_handle(&svc->srv_eq_h)) {
                 int err;
                 err = PtlEQGet(svc->srv_eq_h, &svc->srv_ev);
 
-                if (err == PTL_OK) { 
+                if (err == PTL_OK) {
                         svc->srv_flags |= SVC_EVENT;
-                        EXIT;
-                        rc = 1;
-                        goto out;
+                        GOTO(out, rc = 1);
                 }
 
                 if (err != PTL_EQ_EMPTY) {
@@ -75,21 +58,12 @@ static int ptlrpc_check_event(struct ptlrpc_service *svc)
                         LBUG();
                 }
 
-                EXIT;
-                rc = 0;
-                goto out;
-        }
-
-        if (!list_empty(&svc->srv_reqs)) {
-                svc->srv_flags |= SVC_LIST;
-                EXIT;
-                rc = 1;
-                goto out;
+                GOTO(out, rc = 0);
         }
 
         EXIT;
  out:
-        spin_unlock(&svc->srv_lock); 
+        spin_unlock(&svc->srv_lock);
         return rc;
 }
 
@@ -100,20 +74,18 @@ ptlrpc_init_svc(__u32 bufsize, int req_portal, int rep_portal, char *uuid,
         int err;
         int rc, i;
         struct ptlrpc_service *service;
+        ENTRY;
 
-        OBD_ALLOC(service, sizeof(*service)); 
-        if ( !service ) { 
-                CERROR("no memory\n");
+        OBD_ALLOC(service, sizeof(*service));
+        if (!service) {
                 LBUG();
-                return NULL;
+                RETURN(NULL);
         }
 
-        memset(service, 0, sizeof(*service)); 
-
         spin_lock_init(&service->srv_lock);
         INIT_LIST_HEAD(&service->srv_reqs);
-        init_waitqueue_head(&service->srv_ctl_waitq); 
-        init_waitqueue_head(&service->srv_waitq); 
+        init_waitqueue_head(&service->srv_ctl_waitq);
+        init_waitqueue_head(&service->srv_waitq);
 
         service->srv_thread = NULL;
         service->srv_flags = 0;
@@ -124,24 +96,22 @@ ptlrpc_init_svc(__u32 bufsize, int req_portal, int rep_portal, char *uuid,
         service->srv_handler = handler;
 
         err = kportal_uuid_to_peer(uuid, &service->srv_self);
-        if (err) { 
-                CERROR("cannot get peer for uuid %s", uuid); 
-                OBD_FREE(service, sizeof(*service));
-                return NULL; 
+        if (err) {
+                CERROR("cannot get peer for uuid %s", uuid);
+                GOTO(err_free, NULL);
         }
 
         service->srv_ring_length = RPC_RING_LENGTH;
         service->srv_id.nid = PTL_ID_ANY;
         service->srv_id.pid = PTL_ID_ANY;
 
-        rc = PtlEQAlloc(service->srv_self.peer_ni, 128, 
-                        server_request_callback,
+        rc = PtlEQAlloc(service->srv_self.peer_ni, 128, request_in_callback,
                         service, &(service->srv_eq_h));
 
         if (rc != PTL_OK) {
                 CERROR("PtlEQAlloc failed: %d\n", rc);
                 LBUG();
-                return NULL;
+                GOTO(err_free, NULL);
         }
 
         for (i = 0; i < service->srv_ring_length; i++) {
@@ -149,16 +119,85 @@ ptlrpc_init_svc(__u32 bufsize, int req_portal, int rep_portal, char *uuid,
                 if (service->srv_buf[i] == NULL) {
                         CERROR("no memory\n");
                         LBUG();
-                        return NULL;
+                        GOTO(err_ring, NULL);
                 }
-                service->srv_ref_count[i] = 0; 
-                ptlrpc_link_svc_me(service, i); 
+                service->srv_ref_count[i] = 0;
+                ptlrpc_link_svc_me(service, i);
         }
 
         CDEBUG(D_NET, "Starting service listening on portal %d\n",
                service->srv_req_portal);
 
-        return service;
+        RETURN(service);
+err_ring:
+        service->srv_ring_length = i;
+        rpc_unregister_service(service); // XXX verify this is right
+        PtlEQFree(service->srv_eq_h);
+err_free:
+        OBD_FREE(service, sizeof(*service));
+        return NULL;
+}
+
+static int handle_incoming_request(struct obd_device *obddev,
+                                   struct ptlrpc_service *svc)
+{
+        struct ptlrpc_request request;
+        struct lustre_peer peer;
+        void *start;
+        int rc;
+
+        /* FIXME: If we move to an event-driven model, we should put the request
+         * on the stack of mds_handle instead. */
+        start = svc->srv_ev.mem_desc.start;
+        memset(&request, 0, sizeof(request));
+        request.rq_obd = obddev;
+        request.rq_reqmsg = (svc->srv_ev.mem_desc.start +
+                             svc->srv_ev.offset);
+        request.rq_reqlen = svc->srv_ev.mem_desc.length;
+
+        if (request.rq_reqmsg->xid != svc->srv_ev.match_bits)
+                LBUG();
+
+        CDEBUG(D_NET, "got req %d\n", request.rq_reqmsg->xid);
+
+        peer.peer_nid = svc->srv_ev.initiator.nid;
+        /* FIXME: this NI should be the incoming NI.
+         * We don't know how to find that from here. */
+        peer.peer_ni = svc->srv_self.peer_ni;
+
+        if (request.rq_reqmsg->conn) {
+                request.rq_connection =
+                        (void *)(unsigned long)request.rq_reqmsg->conn;
+                if (request.rq_reqmsg->token !=
+                    request.rq_connection->c_token) {
+                        struct ptlrpc_connection *tmp;
+                        tmp = ptlrpc_get_connection(&peer);
+                        CERROR("rq_reqmsg->conn: %p\n", request.rq_connection);
+                        CERROR("real connection: %p\n", tmp);
+                        CERROR("rq_reqmsg->token: %Lu\n",
+                               (unsigned long long)request.rq_reqmsg->token);
+                        CERROR("real token      : %Lu\n",
+                               (unsigned long long)tmp->c_token);
+                        LBUG();
+                }
+                ptlrpc_connection_addref(request.rq_connection);
+        } else {
+                /*
+                  PHIL? should we perhaps only do this when 
+                   we get an incoming connmgr_connect request? 
+                */
+                request.rq_connection = ptlrpc_get_connection(&peer);
+                if (!request.rq_connection)
+                        LBUG();
+        }
+
+        svc->srv_flags &= ~SVC_EVENT;
+
+        spin_unlock(&svc->srv_lock);
+        rc = svc->srv_handler(obddev, svc, &request);
+        ptlrpc_put_connection(request.rq_connection);
+        ptl_handled_rpc(svc, start);
+        return rc;
 }
 
 static int ptlrpc_main(void *arg)
@@ -182,71 +221,39 @@ static int ptlrpc_main(void *arg)
         /* Record that the  thread is running */
         svc->srv_thread = current;
         svc->srv_flags = SVC_RUNNING;
-        wake_up(&svc->srv_ctl_waitq); 
+        wake_up(&svc->srv_ctl_waitq);
 
         /* XXX maintain a list of all managed devices: insert here */
 
         /* And now, loop forever on requests */
         while (1) {
                 wait_event(svc->srv_waitq, ptlrpc_check_event(svc));
-                
+
                 spin_lock(&svc->srv_lock);
                 if (svc->srv_flags & SVC_SIGNAL) {
-                        EXIT;
+                        svc->srv_flags &= ~SVC_SIGNAL;
                         spin_unlock(&svc->srv_lock);
+                        EXIT;
                         break;
                 }
 
                 if (svc->srv_flags & SVC_STOPPING) {
-                        EXIT;
+                        svc->srv_flags &= ~SVC_STOPPING;
                         spin_unlock(&svc->srv_lock);
+                        EXIT;
                         break;
                 }
-
+                
                 if (svc->srv_flags & SVC_EVENT) { 
-                        struct ptlrpc_request request;
-                        void *start;
-                        svc->srv_flags = SVC_RUNNING; 
-
-                        /* FIXME: If we move to an event-driven model,
-                         * we should put the request on the stack of
-                         * mds_handle instead. */
-                        start = svc->srv_ev.mem_desc.start;
-                        memset(&request, 0, sizeof(request));
-                        request.rq_obd = obddev;
-                        request.rq_reqbuf = (svc->srv_ev.mem_desc.start +
-                                             svc->srv_ev.offset);
-                        request.rq_reqlen = svc->srv_ev.mem_desc.length;
-                        request.rq_xid = svc->srv_ev.match_bits;
-                        CDEBUG(D_NET, "got req %d\n", request.rq_xid);
-
-                        request.rq_peer.peer_nid = svc->srv_ev.initiator.nid;
-                        /* FIXME: this NI should be the incoming NI.
-                         * We don't know how to find that from here. */
-                        request.rq_peer.peer_ni = svc->srv_self.peer_ni;
                         svc->srv_flags &= ~SVC_EVENT;
-
-                        spin_unlock(&svc->srv_lock);
-                        rc = svc->srv_handler(obddev, svc, &request);
-                        ptl_handled_rpc(svc, start);
+                        rc = handle_incoming_request(obddev, svc);
                         continue;
                 }
 
-                if (svc->srv_flags & SVC_LIST) { 
-                        struct ptlrpc_request *request;
-                        svc->srv_flags = SVC_RUNNING; 
-
-                        request = list_entry(svc->srv_reqs.next,
-                                             struct ptlrpc_request,
-                                             rq_list);
-                        list_del(&request->rq_list);
-                        spin_unlock(&svc->srv_lock);
-                        rc = svc->srv_handler(obddev, svc, request);
-                        continue;
-                }
-                CERROR("unknown break in service"); 
+                CERROR("unknown break in service");
                 spin_unlock(&svc->srv_lock);
-                break; 
+                EXIT;
+                break;
         }
 
         svc->srv_thread = NULL;
@@ -261,7 +268,7 @@ void ptlrpc_stop_thread(struct ptlrpc_service *svc)
         svc->srv_flags = SVC_STOPPING;
 
         wake_up(&svc->srv_waitq);
-        wait_event_interruptible(svc->srv_ctl_waitq, 
+        wait_event_interruptible(svc->srv_ctl_waitq,
                                  (svc->srv_flags & SVC_STOPPED));
 }
 
@@ -279,30 +286,27 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
         init_waitqueue_head(&svc->srv_waitq);
 
         init_waitqueue_head(&svc->srv_ctl_waitq);
-        rc = kernel_thread(ptlrpc_main, (void *) &d, 
+        rc = kernel_thread(ptlrpc_main, (void *) &d,
                            CLONE_VM | CLONE_FS | CLONE_FILES);
-        if (rc < 0) { 
-                CERROR("cannot start thread\n"); 
-                return -EINVAL;
+        if (rc < 0) {
+                CERROR("cannot start thread\n");
+                RETURN(-EINVAL);
         }
         wait_event(svc->srv_ctl_waitq, svc->srv_flags & SVC_RUNNING);
 
-        EXIT;
-        return 0;
+        RETURN(0);
 }
 
-
-
 int rpc_unregister_service(struct ptlrpc_service *service)
 {
         int rc, i;
 
         for (i = 0; i < service->srv_ring_length; i++) {
-                if (service->srv_me_h[i]) { 
+                if (ptl_is_valid_handle(&(service->srv_me_h[i]))) {
                         rc = PtlMEUnlink(service->srv_me_h[i]);
                         if (rc)
                                 CERROR("PtlMEUnlink failed: %d\n", rc);
-                        service->srv_me_h[i] = 0;
+                        ptl_set_inv_handle(&(service->srv_me_h[i]));
                 }
 
                 if (service->srv_buf[i] != NULL)