Whamcloud - gitweb
WARNING: we currently crash on unmount after the last phase of runtests.
[fs/lustre-release.git] / lustre / ptlrpc / service.c
index fd92578..7fb4407 100644 (file)
  *
  */
 
-#define EXPORT_SYMTAB
-
-#include <linux/config.h>
-#include <linux/module.h>
-#include <linux/kernel.h>
-
 #define DEBUG_SUBSYSTEM S_RPC
 
 #include <linux/obd_support.h>
 #include <linux/obd_class.h>
 #include <linux/lustre_net.h>
 
-extern int server_request_callback(ptl_event_t *ev, void *data);
+extern int request_in_callback(ptl_event_t *ev);
 extern int ptl_handled_rpc(struct ptlrpc_service *service, void *start);
 
-static int ptlrpc_check_event(struct ptlrpc_service *svc)
+static int ptlrpc_check_event(struct ptlrpc_service *svc,
+                              struct ptlrpc_thread *thread, ptl_event_t *event)
 {
         int rc = 0;
+        ENTRY;
 
-        spin_lock(&svc->srv_lock); 
-        if (sigismember(&(current->pending.signal), SIGKILL) ||
-            sigismember(&(current->pending.signal), SIGSTOP) ||
-            sigismember(&(current->pending.signal), SIGCONT) ||
-            sigismember(&(current->pending.signal), SIGINT)) { 
-                svc->srv_flags |= SVC_KILLED;
-                EXIT;
-                rc = 1;
-                goto out;
-        }
-
-        if ( svc->srv_flags & SVC_STOPPING ) {
-                EXIT;
-                rc = 1;
-                goto out;
-        }
-
-        if (svc->srv_flags & SVC_EVENT)
-                BUG();
+        spin_lock(&svc->srv_lock);
+        if (thread->t_flags & SVC_STOPPING)
+                GOTO(out, rc = 1);
 
-        if ( svc->srv_eq_h ) { 
+        if (ptl_is_valid_handle(&svc->srv_eq_h)) {
                 int err;
-                err = PtlEQGet(svc->srv_eq_h, &svc->srv_ev);
+                err = PtlEQGet(svc->srv_eq_h, event);
 
-                if (err == PTL_OK) { 
-                        svc->srv_flags |= SVC_EVENT;
-                        EXIT;
-                        rc = 1;
-                        goto out;
+                if (err == PTL_OK) {
+                        thread->t_flags |= SVC_EVENT;
+                        GOTO(out, rc = 1);
                 }
 
                 if (err != PTL_EQ_EMPTY) {
-                        CDEBUG(D_NET, "BUG: PtlEQGet returned %d\n", rc);
-                        BUG();
+                        CERROR("BUG: PtlEQGet returned %d\n", rc);
+                        LBUG();
                 }
 
-                EXIT;
-                rc = 0;
-                goto out;
-        }
-
-        if (!list_empty(&svc->srv_reqs)) {
-                svc->srv_flags |= SVC_LIST;
-                EXIT;
-                rc = 1;
-                goto out;
+                GOTO(out, rc = 0);
         }
 
         EXIT;
  out:
-        spin_unlock(&svc->srv_lock); 
+        spin_unlock(&svc->srv_lock);
         return rc;
 }
 
 struct ptlrpc_service *
 ptlrpc_init_svc(__u32 bufsize, int req_portal, int rep_portal, char *uuid,
-                req_unpack_t unpack, rep_pack_t pack, svc_handler_t handler)
+                svc_handler_t handler, char *name)
 {
         int err;
-        struct ptlrpc_service *svc;
+        int rc, i;
+        struct ptlrpc_service *service;
+        ENTRY;
+
+        OBD_ALLOC(service, sizeof(*service));
+        if (!service) {
+                LBUG();
+                RETURN(NULL);
+        }
+
+        service->srv_name = name;
+        spin_lock_init(&service->srv_lock);
+        INIT_LIST_HEAD(&service->srv_reqs);
+        INIT_LIST_HEAD(&service->srv_threads);
+        init_waitqueue_head(&service->srv_waitq);
+
+        service->srv_buf_size = bufsize;
+        service->srv_rep_portal = rep_portal;
+        service->srv_req_portal = req_portal;
+        service->srv_handler = handler;
+
+        err = kportal_uuid_to_peer(uuid, &service->srv_self);
+        if (err) {
+                CERROR("cannot get peer for uuid '%s'\n", uuid);
+                OBD_FREE(service, sizeof(*service));
+                RETURN(NULL);
+        }
+
+        service->srv_ring_length = RPC_RING_LENGTH;
+
+        rc = PtlEQAlloc(service->srv_self.peer_ni, 1024, request_in_callback,
+                        &(service->srv_eq_h));
+
+        if (rc != PTL_OK) {
+                CERROR("PtlEQAlloc failed: %d\n", rc);
+                LBUG();
+                OBD_FREE(service, sizeof(*service));
+                RETURN(NULL);
+        }
+
+        for (i = 0; i < service->srv_ring_length; i++) {
+                OBD_ALLOC(service->srv_buf[i], service->srv_buf_size);
+                if (service->srv_buf[i] == NULL) {
+                        CERROR("no memory\n");
+                        LBUG();
+                        GOTO(err_ring, NULL);
+                }
+                service->srv_ref_count[i] = 0;
+                ptlrpc_link_svc_me(service, i);
+        }
+
+        CDEBUG(D_NET, "Starting service listening on portal %d\n",
+               service->srv_req_portal);
+
+        RETURN(service);
+err_ring:
+        service->srv_ring_length = i;
+        ptlrpc_unregister_service(service);
+        return NULL;
+}
+
+static int handle_incoming_request(struct obd_device *obddev,
+                                   struct ptlrpc_service *svc,
+                                   ptl_event_t *event)
+{
+        struct ptlrpc_request request;
+        struct lustre_peer peer;
+        void *start;
+        int rc;
+
+        /* FIXME: If we move to an event-driven model, we should put the request
+         * on the stack of mds_handle instead. */
+        LASSERT ((event->mem_desc.options & PTL_MD_IOV) == 0);
+        start = event->mem_desc.start;
+
+        memset(&request, 0, sizeof(request));
+        request.rq_svc = svc;
+        request.rq_obd = obddev;
+        request.rq_xid = event->match_bits;
+        request.rq_reqmsg = event->mem_desc.start + event->offset;
+        request.rq_reqlen = event->mem_desc.length;
+
+        if (request.rq_reqlen < sizeof(struct lustre_msg)) {
+                CERROR("incomplete request (%d): ptl %d from %Lx xid %Ld\n",
+                       request.rq_reqlen, svc->srv_req_portal,
+                       event->initiator.nid, request.rq_xid);
+                spin_unlock(&svc->srv_lock);
+                RETURN(-EINVAL);
+        }
+
+        if (request.rq_reqmsg->magic != PTLRPC_MSG_MAGIC) {
+                CERROR("wrong lustre_msg magic %d: ptl %d from %Lx xid %Ld\n",
+                       request.rq_reqmsg->magic, svc->srv_req_portal,
+                       event->initiator.nid, request.rq_xid);
+                spin_unlock(&svc->srv_lock);
+                RETURN(-EINVAL);
+        }
+
+        if (request.rq_reqmsg->version != PTLRPC_MSG_VERSION) {
+                CERROR("wrong lustre_msg version %d: ptl %d from %Lx xid %Ld\n",
+                       request.rq_reqmsg->version, svc->srv_req_portal,
+                       event->initiator.nid, request.rq_xid);
+                spin_unlock(&svc->srv_lock);
+                RETURN(-EINVAL);
+        }
+
+        CDEBUG(D_NET, "got req %Ld\n", request.rq_xid);
+
+        request.rq_peer.peer_nid = event->initiator.nid;
+        /* FIXME: this NI should be the incoming NI.
+         * We don't know how to find that from here. */
+        request.rq_peer.peer_ni = svc->srv_self.peer_ni;
+
+        request.rq_export = class_conn2export((struct lustre_handle *) request.rq_reqmsg);
 
-        OBD_ALLOC(svc, sizeof(*svc)); 
-        if ( !svc ) { 
-                CERROR("no memory\n");
-                return NULL;
+        if (request.rq_export) {
+                request.rq_connection = request.rq_export->exp_connection;
+                ptlrpc_connection_addref(request.rq_connection);
         }
 
-        memset(svc, 0, sizeof(*svc)); 
-
-        spin_lock_init(&svc->srv_lock);
-        INIT_LIST_HEAD(&svc->srv_reqs);
-        init_waitqueue_head(&svc->srv_ctl_waitq); 
-        init_waitqueue_head(&svc->srv_waitq); 
-
-        svc->srv_thread = NULL;
-        svc->srv_flags = 0;
-
-        svc->srv_buf_size = bufsize;
-        svc->srv_rep_portal = rep_portal;
-        svc->srv_req_portal = req_portal;
-        svc->srv_req_unpack = unpack;
-        svc->srv_rep_pack = pack;
-        svc->srv_handler = handler;
-        err = kportal_uuid_to_peer(uuid, &svc->srv_self);
-        if (err) { 
-                CERROR("cannot get peer for uuid %s", uuid); 
-                OBD_FREE(svc, sizeof(*svc));
-                return NULL; 
+        spin_unlock(&svc->srv_lock);
+        rc = svc->srv_handler(&request);
+        ptlrpc_put_connection(request.rq_connection);
+        ptl_handled_rpc(svc, start);
+        return rc;
+}
+
+void ptlrpc_rotate_reqbufs(struct ptlrpc_service *service,
+                            ptl_event_t *ev)
+{
+        int index;
+
+        LASSERT ((ev->mem_desc.options & PTL_MD_IOV) == 0);
+
+        for (index = 0; index < service->srv_ring_length; index++)
+                if (service->srv_buf[index] == ev->mem_desc.start)
+                        break;
+
+        if (index == service->srv_ring_length)
+                LBUG();
+
+        service->srv_ref_count[index]++;
+
+        if (ptl_is_valid_handle(&ev->unlinked_me)) {
+                int idx;
+
+                for (idx = 0; idx < service->srv_ring_length; idx++)
+                        if (service->srv_me_h[idx].handle_idx ==
+                            ev->unlinked_me.handle_idx)
+                                break;
+                if (idx == service->srv_ring_length)
+                        LBUG();
+
+                CDEBUG(D_NET, "unlinked %d\n", idx);
+                ptl_set_inv_handle(&(service->srv_me_h[idx]));
+
+                if (service->srv_ref_count[idx] == 0)
+                        ptlrpc_link_svc_me(service, idx);
         }
-        return svc;
 }
 
 static int ptlrpc_main(void *arg)
@@ -137,6 +234,7 @@ static int ptlrpc_main(void *arg)
         struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
         struct obd_device *obddev = data->dev;
         struct ptlrpc_service *svc = data->svc;
+        struct ptlrpc_thread *thread = data->thread;
 
         ENTRY;
 
@@ -148,213 +246,126 @@ static int ptlrpc_main(void *arg)
         spin_unlock_irq(&current->sigmask_lock);
 
         sprintf(current->comm, data->name);
+        unlock_kernel();
 
-        /* Record that the  thread is running */
-        svc->srv_thread = current;
-        svc->srv_flags = SVC_RUNNING;
-        wake_up(&svc->srv_ctl_waitq); 
+        /* Record that the thread is running */
+        thread->t_flags = SVC_RUNNING;
+        wake_up(&thread->t_ctl_waitq);
 
         /* XXX maintain a list of all managed devices: insert here */
 
         /* And now, loop forever on requests */
         while (1) {
-                wait_event(svc->srv_waitq, ptlrpc_check_event(svc));
-                
+                ptl_event_t event;
+
+                wait_event(svc->srv_waitq,
+                           ptlrpc_check_event(svc, thread, &event));
+
                 spin_lock(&svc->srv_lock);
-                if (svc->srv_flags & SVC_SIGNAL) {
-                        EXIT;
-                        spin_unlock(&svc->srv_lock);
-                        break;
-                }
 
-                if (svc->srv_flags & SVC_STOPPING) {
-                        EXIT;
+                if (thread->t_flags & SVC_STOPPING) {
+                        thread->t_flags &= ~SVC_STOPPING;
                         spin_unlock(&svc->srv_lock);
+                        EXIT;
                         break;
                 }
 
-                if (svc->srv_flags & SVC_EVENT) { 
-                        struct ptlrpc_request request;
-                        void *start;
-                        svc->srv_flags = SVC_RUNNING; 
-
-                        /* FIXME: If we move to an event-driven model,
-                         * we should put the request on the stack of
-                         * mds_handle instead. */
-                        start = svc->srv_ev.mem_desc.start;
-                        memset(&request, 0, sizeof(request));
-                        request.rq_obd = obddev;
-                        request.rq_reqbuf = (svc->srv_ev.mem_desc.start +
-                                             svc->srv_ev.offset);
-                        request.rq_reqlen = svc->srv_ev.mem_desc.length;
-                        request.rq_xid = svc->srv_ev.match_bits;
-                        CDEBUG(D_NET, "got req %d\n", request.rq_xid);
-
-                        request.rq_peer.peer_nid = svc->srv_ev.initiator.nid;
-                        /* FIXME: this NI should be the incoming NI.
-                         * We don't know how to find that from here. */
-                        request.rq_peer.peer_ni = svc->srv_self.peer_ni;
-                        svc->srv_flags &= ~SVC_EVENT;
+                if (thread->t_flags & SVC_EVENT) {
+                        thread->t_flags &= ~SVC_EVENT;
+                        ptlrpc_rotate_reqbufs(svc, &event);
 
-                        spin_unlock(&svc->srv_lock);
-                        rc = svc->srv_handler(obddev, svc, &request);
-                        ptl_handled_rpc(svc, start);
+                        rc = handle_incoming_request(obddev, svc, &event);
+                        thread->t_flags &= ~SVC_EVENT;
                         continue;
                 }
 
-                if (svc->srv_flags & SVC_LIST) { 
-                        struct ptlrpc_request *request;
-                        svc->srv_flags = SVC_RUNNING; 
-
-                        request = list_entry(svc->srv_reqs.next,
-                                             struct ptlrpc_request,
-                                             rq_list);
-                        list_del(&request->rq_list);
-                        spin_unlock(&svc->srv_lock);
-                        rc = svc->srv_handler(obddev, svc, request);
-                        continue;
-                }
-                CERROR("unknown break in service"); 
+                CERROR("unknown break in service");
                 spin_unlock(&svc->srv_lock);
-                break; 
+                EXIT;
+                break;
         }
 
-        svc->srv_thread = NULL;
-        svc->srv_flags = SVC_STOPPED;
-        wake_up(&svc->srv_ctl_waitq);
-        CDEBUG(D_NET, "svc exiting process %d\n", current->pid);
+        thread->t_flags = SVC_STOPPED;
+        wake_up(&thread->t_ctl_waitq);
+        CDEBUG(D_NET, "service thread exiting, process %d\n", current->pid);
         return 0;
 }
 
-void ptlrpc_stop_thread(struct ptlrpc_service *svc)
+static void ptlrpc_stop_thread(struct ptlrpc_service *svc,
+                               struct ptlrpc_thread *thread)
 {
-        svc->srv_flags = SVC_STOPPING;
+        spin_lock(&svc->srv_lock);
+        thread->t_flags = SVC_STOPPING;
+        spin_unlock(&svc->srv_lock);
 
         wake_up(&svc->srv_waitq);
-        wait_event_interruptible(svc->srv_ctl_waitq, 
-                                 (svc->srv_flags & SVC_STOPPED));
+        wait_event(thread->t_ctl_waitq, (thread->t_flags & SVC_STOPPED));
+}
+
+void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
+{
+        spin_lock(&svc->srv_lock);
+        while (!list_empty(&svc->srv_threads)) {
+                struct ptlrpc_thread *thread;
+                thread = list_entry(svc->srv_threads.next, struct ptlrpc_thread,
+                                    t_link);
+                spin_unlock(&svc->srv_lock);
+                ptlrpc_stop_thread(svc, thread);
+                spin_lock(&svc->srv_lock);
+                list_del(&thread->t_link);
+                OBD_FREE(thread, sizeof(*thread));
+        }
+        spin_unlock(&svc->srv_lock);
 }
 
 int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
-                                char *name)
+                        char *name)
 {
         struct ptlrpc_svc_data d;
+        struct ptlrpc_thread *thread;
         int rc;
         ENTRY;
 
+        OBD_ALLOC(thread, sizeof(*thread));
+        if (thread == NULL) {
+                LBUG();
+                RETURN(-ENOMEM);
+        }
+        init_waitqueue_head(&thread->t_ctl_waitq);
+
         d.dev = dev;
         d.svc = svc;
         d.name = name;
+        d.thread = thread;
 
-        init_waitqueue_head(&svc->srv_waitq);
+        spin_lock(&svc->srv_lock);
+        list_add(&thread->t_link, &svc->srv_threads);
+        spin_unlock(&svc->srv_lock);
 
-        init_waitqueue_head(&svc->srv_ctl_waitq);
-        rc = kernel_thread(ptlrpc_main, (void *) &d, 
+        rc = kernel_thread(ptlrpc_main, (void *) &d,
                            CLONE_VM | CLONE_FS | CLONE_FILES);
-        if (rc < 0) { 
-                CERROR("cannot start thread\n"); 
-                return -EINVAL;
+        if (rc < 0) {
+                CERROR("cannot start thread\n");
+                OBD_FREE(thread, sizeof(*thread));
+                RETURN(-EINVAL);
         }
-        wait_event(svc->srv_ctl_waitq, svc->srv_flags & SVC_RUNNING);
+        wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_RUNNING);
 
-        EXIT;
-        return 0;
+        RETURN(0);
 }
 
-
-int rpc_register_service(struct ptlrpc_service *service, char *uuid)
+int ptlrpc_unregister_service(struct ptlrpc_service *service)
 {
-        struct lustre_peer peer;
         int rc, i;
 
-        rc = kportal_uuid_to_peer(uuid, &peer);
-        if (rc != 0) {
-                CERROR("Invalid uuid \"%s\"\n", uuid);
-                return -EINVAL;
-        }
-
-        service->srv_ring_length = RPC_RING_LENGTH;
-        service->srv_md_active = 0;
-
-        service->srv_id.addr_kind = PTL_ADDR_GID;
-        service->srv_id.gid = PTL_ID_ANY;
-        service->srv_id.rid = PTL_ID_ANY;
-
-        rc = PtlEQAlloc(peer.peer_ni, 128, server_request_callback,
-                        service, &(service->srv_eq_h));
-
-        if (rc != PTL_OK) {
-                CERROR("PtlEQAlloc failed: %d\n", rc);
-                return rc;
-        }
-
-        CDEBUG(D_NET, "Starting service listening on portal %d\n",
-               service->srv_req_portal);
-
-        /* Attach the leading ME on which we build the ring */
-        rc = PtlMEAttach(peer.peer_ni, service->srv_req_portal,
-                         service->srv_id, 0, ~0, PTL_RETAIN,
-                         &(service->srv_me_h[0]));
-
-        if (rc != PTL_OK) {
-                CERROR("PtlMEAttach failed: %d\n", rc);
-                return rc;
-        }
-
         for (i = 0; i < service->srv_ring_length; i++) {
-                OBD_ALLOC(service->srv_buf[i], service->srv_buf_size);
-
-                if (service->srv_buf[i] == NULL) {
-                        CERROR("no memory\n");
-                        return -ENOMEM;
-                }
-
-
-                /* Insert additional ME's to the ring */
-                if (i > 0) {
-                        rc = PtlMEInsert(service->srv_me_h[i-1],
-                                         service->srv_id, 0, ~0, PTL_RETAIN,
-                                         PTL_INS_AFTER,&(service->srv_me_h[i]));
-                        service->srv_me_tail = i;
-
-                        if (rc != PTL_OK) {
-                                CERROR("PtlMEInsert failed: %d\n", rc);
-                                return rc;
-                        }
+                if (ptl_is_valid_handle(&(service->srv_me_h[i]))) {
+                        rc = PtlMEUnlink(service->srv_me_h[i]);
+                        if (rc)
+                                CERROR("PtlMEUnlink failed: %d\n", rc);
+                        ptl_set_inv_handle(&(service->srv_me_h[i]));
                 }
 
-                service->srv_ref_count[i] = 0;
-                service->srv_md[i].start         = service->srv_buf[i];
-                service->srv_md[i].length        = service->srv_buf_size;
-                service->srv_md[i].max_offset    = service->srv_buf_size;
-                service->srv_md[i].threshold     = PTL_MD_THRESH_INF;
-                service->srv_md[i].options       = PTL_MD_OP_PUT;
-                service->srv_md[i].user_ptr      = service;
-                service->srv_md[i].eventq        = service->srv_eq_h;
-                service->srv_md[i].max_offset    = service->srv_buf_size;
-
-                rc = PtlMDAttach(service->srv_me_h[i], service->srv_md[i],
-                                 PTL_RETAIN, &(service->srv_md_h[i]));
-
-                if (rc != PTL_OK) {
-                        /* cleanup */
-                        CERROR("PtlMDAttach failed: %d\n", rc);
-                        return rc;
-                }
-        }
-
-        return 0;
-}
-
-int rpc_unregister_service(struct ptlrpc_service *service)
-{
-        int rc, i;
-
-        for (i = 0; i < service->srv_ring_length; i++) {
-                rc = PtlMEUnlink(service->srv_me_h[i]);
-                if (rc)
-                        CERROR("PtlMEUnlink failed: %d\n", rc);
-
                 if (service->srv_buf[i] != NULL)
                         OBD_FREE(service->srv_buf[i], service->srv_buf_size);
                 service->srv_buf[i] = NULL;
@@ -364,5 +375,14 @@ int rpc_unregister_service(struct ptlrpc_service *service)
         if (rc)
                 CERROR("PtlEQFree failed: %d\n", rc);
 
-        return 0;
+        if (!list_empty(&service->srv_reqs)) {
+                // XXX reply with errors and clean up
+                CERROR("Request list not empty!\n");
+                rc = -EBUSY;
+        }
+
+        OBD_FREE(service, sizeof(*service));
+        if (rc) 
+                LBUG();
+        return rc;
 }