Whamcloud - gitweb
Modifications for "circulating" request buffers (sized in lustre_net.h)
authoreeb <eeb>
Fri, 20 Sep 2002 21:05:50 +0000 (21:05 +0000)
committereeb <eeb>
Fri, 20 Sep 2002 21:05:50 +0000 (21:05 +0000)
lustre/include/linux/lustre_net.h
lustre/ldlm/ldlm_lockd.c
lustre/mds/handler.c
lustre/ost/ost_handler.c
lustre/ptlrpc/events.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/service.c

index a5626e5..69c6732 100644 (file)
 #include <linux/lustre_ha.h>
 #include <linux/lustre_import.h>
 
-/* default rpc ring length */
-//#define RPC_RING_LENGTH    10
-#define RPC_REQUEST_QUEUE_DEPTH        1024
+/* The following constants determine how much memory is devoted to
+ * buffering in the lustre services.
+ *
+ * ?_NEVENTS            # event queue entries
+ *
+ * ?_NBUFS             # request buffers
+ * ?_BUFSIZE           # bytes in a single request buffer
+ * total memory = ?_NBUFS * ?_BUFSIZE
+ *
+ * ?_MAXREQSIZE         # maximum request service will receive
+ * larger messages will get dropped.
+ * request buffers are auto-unlinked when less than ?_MAXREQSIZE
+ * is left in them.
+ */
+
+#define LDLM_NEVENTS   1024
+#define LDLM_NBUFS     10
+#define LDLM_BUFSIZE   (64 * 1024)
+#define LDLM_MAXREQSIZE        1024
+
+#define MDS_NEVENTS    1024
+#define MDS_NBUFS      10
+#define MDS_BUFSIZE    (64 * 1024)
+#define MDS_MAXREQSIZE 1024
+
+#ifdef __arch_um__
+#define OST_NEVENTS    1024
+#define OST_NBUFS      10
+#define OST_BUFSIZE    (64 * 1024)
+#define OST_MAXREQSIZE (8 * 1024)
+#else
+#define OST_NEVENTS    4096
+#define OST_NBUFS      40
+#define OST_BUFSIZE    (128 * 1024)
+#define OST_MAXREQSIZE (8 * 1024)
+#endif
 
 struct ptlrpc_connection {
         struct list_head        c_link;
@@ -184,8 +217,10 @@ struct ptlrpc_thread {
 };
 
 struct ptlrpc_request_buffer_desc {
+        struct list_head       rqbd_list;
         struct ptlrpc_service *rqbd_service;
         ptl_handle_me_t        rqbd_me_h;
+        atomic_t               rqbd_refcount;
         char                  *rqbd_buffer;
 };
 
@@ -196,10 +231,12 @@ struct ptlrpc_service {
         /* incoming request buffers */
         /* FIXME: perhaps a list of EQs, if multiple NIs are used? */
 
-        struct ptlrpc_request_buffer_desc *srv_rqbds; /* all the request buffer descriptors */
+        __u32            srv_max_req_size;      /* biggest request to receive */
+        __u32            srv_buf_size;          /* # bytes in a request buffer */
+        struct list_head srv_rqbds;             /* all the request buffer descriptors */
+        __u32            srv_nrqbds;            /* # request buffers */
+        atomic_t         srv_nrqbds_receiving;  /* # request buffers posted for input */
 
-        __u32 srv_buf_size;                     /* # bytes in a request buffer */
-        __u32 srv_nbuffs;                       /* # request buffers */
         __u32 srv_req_portal;
         __u32 srv_rep_portal;
 
@@ -213,7 +250,6 @@ struct ptlrpc_service {
         wait_queue_head_t srv_waitq; /* all threads sleep on this */
 
         spinlock_t srv_lock;
-        struct list_head srv_reqs;
         struct list_head srv_threads;
         int (*srv_handler)(struct ptlrpc_request *req);
         char *srv_name;  /* only statically allocated strings here; we don't clean them */
@@ -274,7 +310,8 @@ int ptlrpc_check_status(struct ptlrpc_request *req, int err);
 
 /* rpc/service.c */
 struct ptlrpc_service *
-ptlrpc_init_svc(__u32 bufsize, int nbuffs, int req_portal, int rep_portal,
+ptlrpc_init_svc(__u32 nevents, __u32 nbufs, __u32 bufsize, __u32 max_req_size, 
+                int req_portal, int rep_portal,
                 obd_uuid_t uuid, svc_handler_t, char *name);
 void ptlrpc_stop_all_threads(struct ptlrpc_service *svc);
 int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
index d7cb958..a6ede2f 100644 (file)
@@ -559,9 +559,10 @@ static int ldlm_setup(struct obd_device *obddev, obd_count len, void *buf)
         if (rc != 0)
                 GOTO(out_dec, rc);
         */
-        ldlm->ldlm_service = ptlrpc_init_svc(1024, 640, LDLM_REQUEST_PORTAL,
-                                             LDLM_REPLY_PORTAL, "self",
-                                             ldlm_callback_handler, "ldlm");
+        ldlm->ldlm_service = ptlrpc_init_svc(LDLM_NEVENTS, LDLM_NBUFS,
+                                             LDLM_BUFSIZE, LDLM_MAXREQSIZE,
+                                             LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL, 
+                                             "self", ldlm_callback_handler, "ldlm");
         if (!ldlm->ldlm_service)
                 GOTO(out_dec, rc = -ENOMEM);
 
index 113094a..539e50b 100644 (file)
@@ -1098,9 +1098,10 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
                 GOTO(err_put, rc);
         }
 
-        mds->mds_service = ptlrpc_init_svc(1024, 640, MDS_REQUEST_PORTAL,
-                                           MDC_REPLY_PORTAL, "self",mds_handle,
-                                           "mds");
+        mds->mds_service = ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS,
+                                           MDS_BUFSIZE, MDS_MAXREQSIZE,
+                                           MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL, 
+                                           "self", mds_handle, "mds");
         if (!mds->mds_service) {
                 CERROR("failed to start service\n");
                 GOTO(err_fs, rc = -EINVAL);
index 03a58ea..2389655 100644 (file)
@@ -597,9 +597,10 @@ static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
                 GOTO(error_dec, err = -EINVAL);
         }
 
-        ost->ost_service = ptlrpc_init_svc(1024, 4096, OST_REQUEST_PORTAL,
-                                           OSC_REPLY_PORTAL, "self", ost_handle,
-                                           "ost");
+        ost->ost_service = ptlrpc_init_svc(OST_NEVENTS, OST_NBUFS,
+                                           OST_BUFSIZE, OST_MAXREQSIZE,
+                                           OST_REQUEST_PORTAL, OSC_REPLY_PORTAL, 
+                                           "self", ost_handle, "ost");
         if (!ost->ost_service) {
                 CERROR("failed to start service\n");
                 GOTO(error_disc, err = -EINVAL);
index bff7466..4e9b29c 100644 (file)
@@ -109,15 +109,34 @@ int request_in_callback(ptl_event_t *ev)
         struct ptlrpc_service *service = rqbd->rqbd_service;
 
         LASSERT ((ev->mem_desc.options & PTL_MD_IOV) == 0); /* requests always contiguous */
-
+        LASSERT (ev->type == PTL_EVENT_PUT);    /* we only enable puts */
+        LASSERT (atomic_read (&service->srv_nrqbds_receiving) > 0);
+        LASSERT (atomic_read (&rqbd->rqbd_refcount) > 0);
+        
         if (ev->rlength != ev->mlength)
                 CERROR("Warning: Possibly truncated rpc (%d/%d)\n",
                        ev->mlength, ev->rlength);
 
-        if (ev->type == PTL_EVENT_PUT)
-                wake_up(&service->srv_waitq);
+        if (ptl_is_valid_handle (&ev->unlinked_me))
+        {
+                /* This is the last request to be received into this
+                 * request buffer.  We don't bump the refcount, since the
+                 * thread servicing this event is effectively taking over
+                 * portals' reference.
+                 */
+                LASSERT (!memcmp (&ev->unlinked_me, &rqbd->rqbd_me_h, 
+                                  sizeof (ev->unlinked_me)));
+
+                if (atomic_dec_and_test (&service->srv_nrqbds_receiving)) /* we're off-air */
+                {
+                        CERROR ("All request buffers busy\n");
+                        LBUG();
+                }
+        }
         else
-                CERROR("Unexpected event type: %d\n", ev->type);
+                atomic_inc (&rqbd->rqbd_refcount); /* +1 ref for service thread */
+
+        wake_up(&service->srv_waitq);
 
         return 0;
 }
index 2351bdd..d7884f9 100644 (file)
@@ -398,6 +398,8 @@ void ptlrpc_link_svc_me(struct ptlrpc_request_buffer_desc *rqbd)
         ptl_md_t dummy;
         ptl_handle_md_t md_h;
 
+        LASSERT (atomic_read (&rqbd->rqbd_refcount) == 0);
+        
         /* Attach the leading ME on which we build the ring */
         rc = PtlMEAttach(service->srv_self.peer_ni, service->srv_req_portal,
                          match_id, 0, ~0,
@@ -409,16 +411,22 @@ void ptlrpc_link_svc_me(struct ptlrpc_request_buffer_desc *rqbd)
 
         dummy.start         = rqbd->rqbd_buffer;
         dummy.length        = service->srv_buf_size;
-        dummy.max_offset    = service->srv_buf_size;
-        dummy.threshold     = 1;
-        dummy.options       = PTL_MD_OP_PUT;
+        dummy.max_size      = service->srv_max_req_size;
+        dummy.threshold     = PTL_MD_THRESH_INF;
+        dummy.options       = PTL_MD_OP_PUT | PTL_MD_MAX_SIZE | PTL_MD_AUTO_UNLINK;
         dummy.user_ptr      = rqbd;
         dummy.eventq        = service->srv_eq_h;
 
+        atomic_inc (&service->srv_nrqbds_receiving);
+        atomic_set (&rqbd->rqbd_refcount, 1);   /* 1 ref for portals */
+        
         rc = PtlMDAttach(rqbd->rqbd_me_h, dummy, PTL_UNLINK, &md_h);
         if (rc != PTL_OK) {
-                /* cleanup */
                 CERROR("PtlMDAttach failed: %d\n", rc);
                 LBUG();
+#warning proper cleanup required
+                PtlMEUnlink (rqbd->rqbd_me_h);
+                atomic_set (&rqbd->rqbd_refcount, 0);
+                atomic_dec (&service->srv_nrqbds_receiving);
         }
 }
index 81abcd7..cac9932 100644 (file)
@@ -64,7 +64,9 @@ static int ptlrpc_check_event(struct ptlrpc_service *svc,
 }
 
 struct ptlrpc_service *
-ptlrpc_init_svc(__u32 bufsize, int nbuffs, int req_portal, int rep_portal,
+ptlrpc_init_svc(__u32 nevents, __u32 nbufs, 
+                __u32 bufsize, __u32 max_req_size, 
+                int req_portal, int rep_portal,
                 obd_uuid_t uuid, svc_handler_t handler, char *name)
 {
         int err;
@@ -80,12 +82,15 @@ ptlrpc_init_svc(__u32 bufsize, int nbuffs, int req_portal, int rep_portal,
 
         service->srv_name = name;
         spin_lock_init(&service->srv_lock);
-        INIT_LIST_HEAD(&service->srv_reqs);
         INIT_LIST_HEAD(&service->srv_threads);
         init_waitqueue_head(&service->srv_waitq);
 
+        service->srv_max_req_size = max_req_size;
         service->srv_buf_size = bufsize;
-        service->srv_nbuffs = nbuffs;
+        INIT_LIST_HEAD (&service->srv_rqbds);
+        service->srv_nrqbds = 0;
+        atomic_set (&service->srv_nrqbds_receiving, 0);
+
         service->srv_rep_portal = rep_portal;
         service->srv_req_portal = req_portal;
         service->srv_handler = handler;
@@ -97,36 +102,37 @@ ptlrpc_init_svc(__u32 bufsize, int nbuffs, int req_portal, int rep_portal,
                 RETURN(NULL);
         }
 
-        /* NB We need exactly 1 event for each buffer we queue */
-        rc = PtlEQAlloc(service->srv_self.peer_ni, service->srv_nbuffs,
+        rc = PtlEQAlloc(service->srv_self.peer_ni, nevents,
                         request_in_callback, &(service->srv_eq_h));
 
         if (rc != PTL_OK) {
                 CERROR("PtlEQAlloc failed: %d\n", rc);
-                LBUG();
                 OBD_FREE(service, sizeof(*service));
                 RETURN(NULL);
         }
 
-        OBD_ALLOC(service->srv_rqbds, service->srv_nbuffs *
-                  sizeof(struct ptlrpc_request_buffer_desc));
-        if (service->srv_rqbds == NULL) {
-                CERROR("no memory\n");
-                LBUG();
-                GOTO(failed, NULL);
-        }
+        for (i = 0; i < nbufs; i++) {
+                struct ptlrpc_request_buffer_desc *rqbd;
 
-        for (i = 0; i < service->srv_nbuffs; i++) {
-                struct ptlrpc_request_buffer_desc *rqbd =&service->srv_rqbds[i];
+                OBD_ALLOC (rqbd, sizeof (*rqbd));
+                if (rqbd == NULL)
+                {
+                        CERROR ("no memory\n");
+                        GOTO (failed, NULL);
+                }
 
                 rqbd->rqbd_service = service;
                 ptl_set_inv_handle (&rqbd->rqbd_me_h);
+                atomic_set (&rqbd->rqbd_refcount, 0);
                 OBD_ALLOC(rqbd->rqbd_buffer, service->srv_buf_size);
                 if (rqbd->rqbd_buffer == NULL) {
+                        OBD_FREE (rqbd, sizeof (*rqbd));
                         CERROR("no memory\n");
-                        LBUG();
                         GOTO(failed, NULL);
                 }
+                list_add (&rqbd->rqbd_list, &service->srv_rqbds);
+                service->srv_nrqbds++;
+                
                 ptlrpc_link_svc_me(rqbd);
         }
 
@@ -149,48 +155,47 @@ static int handle_incoming_request(struct obd_device *obddev,
 
         /* FIXME: If we move to an event-driven model, we should put the request
          * on the stack of mds_handle instead. */
+
+        LASSERT (atomic_read (&rqbd->rqbd_refcount) > 0);
         LASSERT ((event->mem_desc.options & PTL_MD_IOV) == 0);
         LASSERT (rqbd->rqbd_service == svc);
         LASSERT (rqbd->rqbd_buffer == event->mem_desc.start);
-        LASSERT (event->offset == 0);
+        LASSERT (event->offset + event->mlength <= svc->srv_buf_size);
 
         memset(request, 0, sizeof(*request));
         request->rq_svc = svc;
         request->rq_obd = obddev;
         request->rq_xid = event->match_bits;
         request->rq_reqmsg = event->mem_desc.start + event->offset;
-        request->rq_reqlen = event->mem_desc.length;
+        request->rq_reqlen = event->mlength;
+
+        rc = -EINVAL;
 
         if (request->rq_reqlen < sizeof(struct lustre_msg)) {
                 CERROR("incomplete request (%d): ptl %d from "LPX64" xid "LPD64"\n",
                        request->rq_reqlen, svc->srv_req_portal,
                        event->initiator.nid, request->rq_xid);
-                spin_unlock(&svc->srv_lock);
-                RETURN(-EINVAL);
+                goto out;
         }
 
         if (NTOH__u32(request->rq_reqmsg->type) != PTL_RPC_MSG_REQUEST) {
                 CERROR("wrong packet type received (type=%u)\n",
                        request->rq_reqmsg->type);
-                LBUG();
-                spin_unlock(&svc->srv_lock);
-                RETURN(-EINVAL);
+                goto out;
         }
 
         if (request->rq_reqmsg->magic != PTLRPC_MSG_MAGIC) {
                 CERROR("wrong lustre_msg magic %d: ptl %d from "LPX64" xid "LPD64"\n",
                        request->rq_reqmsg->magic, svc->srv_req_portal,
                        event->initiator.nid, request->rq_xid);
-                spin_unlock(&svc->srv_lock);
-                RETURN(-EINVAL);
+                goto out;
         }
 
         if (request->rq_reqmsg->version != PTLRPC_MSG_VERSION) {
                 CERROR("wrong lustre_msg version %d: ptl %d from "LPX64" xid "LPD64"\n",
                        request->rq_reqmsg->version, svc->srv_req_portal,
                        event->initiator.nid, request->rq_xid);
-                spin_unlock(&svc->srv_lock);
-                RETURN(-EINVAL);
+                goto out;
         }
 
         CDEBUG(D_NET, "got req "LPD64"\n", request->rq_xid);
@@ -208,12 +213,13 @@ static int handle_incoming_request(struct obd_device *obddev,
                 ptlrpc_connection_addref(request->rq_connection);
         }
 
-        spin_unlock(&svc->srv_lock);
-
         rc = svc->srv_handler(request);
         ptlrpc_put_connection(request->rq_connection);
 
-        ptlrpc_link_svc_me (rqbd);
+ out:
+        if (atomic_dec_and_test (&rqbd->rqbd_refcount)) /* last reference? */
+                ptlrpc_link_svc_me (rqbd);
+        
         return rc;
 }
 
@@ -257,25 +263,27 @@ static int ptlrpc_main(void *arg)
                 wait_event(svc->srv_waitq,
                            ptlrpc_check_event(svc, thread, event));
 
-                spin_lock(&svc->srv_lock);
-
                 if (thread->t_flags & SVC_STOPPING) {
+                        spin_lock(&svc->srv_lock);
                         thread->t_flags &= ~SVC_STOPPING;
                         spin_unlock(&svc->srv_lock);
+
                         EXIT;
                         break;
                 }
 
                 if (thread->t_flags & SVC_EVENT) {
-                        LASSERT (event->sequence != 0);
+                        spin_lock(&svc->srv_lock);
+                        thread->t_flags &= ~SVC_EVENT;
+                        spin_unlock(&svc->srv_lock);
+
                         rc = handle_incoming_request(obddev, svc, event,
                                                      request);
-                        thread->t_flags &= ~SVC_EVENT;
                         continue;
                 }
 
                 CERROR("unknown break in service");
-                spin_unlock(&svc->srv_lock);
+                LBUG();
                 EXIT;
                 break;
         }
@@ -358,43 +366,47 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
 
 int ptlrpc_unregister_service(struct ptlrpc_service *service)
 {
-        int rc, i;
-
-        /* NB service->srv_nbuffs gets set before we attempt (and possibly
-         * fail) to allocate srv_rqbds.
-         */
-        if (service->srv_rqbds != NULL) {
-                for (i = 0; i < service->srv_nbuffs; i++) {
-                        struct ptlrpc_request_buffer_desc *rqbd =
-                                &service->srv_rqbds[i];
-
-                        if (rqbd->rqbd_buffer == NULL) /* no buffer allocated */
-                                continue;             /* => never initialised */
-
-                        /* Buffer allocated => got linked */
-                        LASSERT (ptl_is_valid_handle (&rqbd->rqbd_me_h));
+        int rc;
 
-                        rc = PtlMEUnlink(rqbd->rqbd_me_h);
-                        if (rc)
-                                CERROR("PtlMEUnlink failed: %d\n", rc);
+        LASSERT (list_empty (&service->srv_threads));
 
-                        OBD_FREE(rqbd->rqbd_buffer, service->srv_buf_size);
-                }
-
-                OBD_FREE(service->srv_rqbds, service->srv_nbuffs *
-                         sizeof (struct ptlrpc_request_buffer_desc));
+        /* XXX We could reply (with failure) to all buffered requests
+         * _after_ unlinking _all_ the request buffers, but _before_
+         * freeing them.
+         */
+        
+        while (!list_empty (&service->srv_rqbds))
+        {
+                struct ptlrpc_request_buffer_desc *rqbd = 
+                        list_entry (service->srv_rqbds.next,
+                                    struct ptlrpc_request_buffer_desc,
+                                    rqbd_list);
+
+                list_del (&rqbd->rqbd_list);
+
+                LASSERT (atomic_read (&rqbd->rqbd_refcount) > 0);
+                /* refcount could be anything; it's possible for the
+                 * buffers to continued to get filled after all the server
+                 * threads exited.  But we know they _have_ exited.
+                 */
+
+                (void) PtlMEUnlink(rqbd->rqbd_me_h);
+                /* The callback handler could have unlinked this ME already
+                 * (we're racing with her) but it's safe to ensure it _has_
+                 * been unlinked.
+                 */
+
+                OBD_FREE (rqbd->rqbd_buffer, service->srv_buf_size);
+                OBD_FREE (rqbd, sizeof (*rqbd));
+                service->srv_nrqbds--;
         }
 
+        LASSERT (service->srv_nrqbds == 0);
+        
         rc = PtlEQFree(service->srv_eq_h);
         if (rc)
                 CERROR("PtlEQFree failed: %d\n", rc);
 
-        if (!list_empty(&service->srv_reqs)) {
-                // XXX reply with errors and clean up
-                CERROR("Request list not empty!\n");
-                rc = -EBUSY;
-        }
-
         OBD_FREE(service, sizeof(*service));
         if (rc)
                 LBUG();