Whamcloud - gitweb
- simplified ptlrpc_service by using ME auto-unlinking
authorbraam <braam>
Wed, 27 Mar 2002 20:31:22 +0000 (20:31 +0000)
committerbraam <braam>
Wed, 27 Mar 2002 20:31:22 +0000 (20:31 +0000)
- merged ptlrpc_init_srv and rpc_register_service

lustre/include/linux/lustre_net.h
lustre/mds/handler.c
lustre/ost/ost_handler.c
lustre/ptlrpc/events.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/service.c

index 330cb81..bd5e1fd 100644 (file)
@@ -99,28 +99,28 @@ struct ptlrpc_client {
 struct ptlrpc_request { 
         int rq_type; /* one of PTL_RPC_REQUEST, PTL_RPC_REPLY, PTL_RPC_BULK */
         spinlock_t rq_lock;
-       struct list_head rq_list;
-       struct obd_device *rq_obd;
-       int rq_status;
+        struct list_head rq_list;
+        struct obd_device *rq_obd;
+        int rq_status;
         int rq_flags; 
         __u32 rq_connid;
         __u32 rq_xid;
 
-       char *rq_reqbuf;
-       int rq_reqlen;
-       struct ptlreq_hdr *rq_reqhdr;
-       union ptl_req rq_req;
+        char *rq_reqbuf;
+        int rq_reqlen;
+        struct ptlreq_hdr *rq_reqhdr;
+        union ptl_req rq_req;
 
-       char *rq_repbuf;
-       int rq_replen;
-       struct ptlrep_hdr *rq_rephdr;
-       union ptl_rep rq_rep;
+        char *rq_repbuf;
+        int rq_replen;
+        struct ptlrep_hdr *rq_rephdr;
+        union ptl_rep rq_rep;
 
         char *rq_bulkbuf;
         int rq_bulklen;
 
         void * rq_reply_handle;
-       wait_queue_head_t rq_wait_for_rep;
+        wait_queue_head_t rq_wait_for_rep;
 
         /* incoming reply */
         ptl_md_t rq_reply_md;
@@ -148,7 +148,7 @@ struct ptlrpc_bulk_desc {
         struct obd_conn b_conn;
         __u32 b_xid;
 
-       wait_queue_head_t b_waitq;
+        wait_queue_head_t b_waitq;
 
         ptl_md_t b_md;
         ptl_handle_md_t b_md_h;
@@ -159,17 +159,13 @@ struct ptlrpc_service {
         /* incoming request buffers */
         /* FIXME: perhaps a list of EQs, if multiple NIs are used? */
         char *srv_buf[RPC_RING_LENGTH];
+        __u32 srv_ref_count[RPC_RING_LENGTH];
+        ptl_handle_me_t srv_me_h[RPC_RING_LENGTH];
         __u32 srv_buf_size;
-       __u32 srv_me_tail;
-       __u32 srv_md_active;
         __u32 srv_ring_length;
         __u32 srv_req_portal;
         __u32 srv_rep_portal;
-        __u32 srv_ref_count[RPC_RING_LENGTH];
-        ptl_handle_me_t srv_me_h[RPC_RING_LENGTH];
-        ptl_process_id_t srv_id;
-        ptl_md_t srv_md[RPC_RING_LENGTH];
-        ptl_handle_md_t srv_md_h[RPC_RING_LENGTH];
+
         __u32 srv_xid;
 
         /* event queue */
@@ -177,13 +173,14 @@ struct ptlrpc_service {
 
         __u32 srv_flags; 
         struct lustre_peer srv_self;
-       struct task_struct *srv_thread;
-       wait_queue_head_t srv_waitq;
-       wait_queue_head_t srv_ctl_waitq;
-       int ost_flags;
+        ptl_process_id_t srv_id;
+
+        struct task_struct *srv_thread;
+        wait_queue_head_t srv_waitq;
+        wait_queue_head_t srv_ctl_waitq;
 
-       spinlock_t srv_lock;
-       struct list_head srv_reqs;
+        spinlock_t srv_lock;
+        struct list_head srv_reqs;
         ptl_event_t  srv_ev;
         req_unpack_t      srv_req_unpack;
         rep_pack_t        srv_rep_pack;
@@ -208,6 +205,7 @@ int ptlrpc_reply(struct obd_device *obddev, struct ptlrpc_service *svc,
 int ptlrpc_error(struct obd_device *obddev, struct ptlrpc_service *svc,
                  struct ptlrpc_request *req);
 int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer);
+void ptlrpc_link_svc_me(struct ptlrpc_service *service, int i);
 
 /* rpc/client.c */
 int ptlrpc_connect_client(int dev, char *uuid, int req_portal, int rep_portal, 
index 92f4216..c589d2d 100644 (file)
@@ -505,8 +505,10 @@ static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
         mds->mds_service = ptlrpc_init_svc(64 * 1024,
                                            MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
                                            "self", mds_handle);
-
-        rpc_register_service(mds->mds_service, "self");
+        if (!mds->mds_service) {
+                CERROR("failed to start service\n");
+                RETURN(-EINVAL);
+        }
 
         err = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds");
         if (err)
index b8b3bdf..6d818e0 100644 (file)
@@ -682,12 +682,11 @@ static int ost_setup(struct obd_device *obddev, obd_count len,
                                            OST_REQUEST_PORTAL, OSC_REPLY_PORTAL,
                                            "self", ost_handle);
         if (!ost->ost_service) {
+                CERROR("failed to start service\n");
                 obd_disconnect(&ost->ost_conn);
                 RETURN(-EINVAL);
         }
 
-        rpc_register_service(ost->ost_service, "self");
-
         err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
         if (err) {
                 obd_disconnect(&ost->ost_conn);
index d07acd7..fc97e3a 100644 (file)
@@ -79,63 +79,43 @@ static int rcvd_reply_callback(ptl_event_t *ev, void *data)
 int server_request_callback(ptl_event_t *ev, void *data)
 {
         struct ptlrpc_service *service = data;
-        int rc;
+        int index;
 
         if (ev->rlength != ev->mlength)
                 CERROR("Warning: Possibly truncated rpc (%d/%d)\n",
                        ev->mlength, ev->rlength);
 
-        /* The ME is unlinked when there is less than 1024 bytes free
-         * on its MD.  This ensures we are always able to handle the rpc, 
-         * although the 1024 value is a guess as to the size of a
-         * large rpc (the known safe margin should be determined).
-         *
-         * NOTE: The portals API by default unlinks all MD's associated
-         *       with an ME when it's unlinked.  For now, this behavior
-         *       has been commented out of the portals library so the
-         *       MD can be unlinked when its ref count drops to zero.
-         *       A new MD and ME will then be created that use the same
-         *       kmalloc()'ed memory and inserted at the ring tail.
-         */
-
         spin_lock(&service->srv_lock); 
-        if ( ev->mem_desc.start != 
-             service->srv_md[service->srv_md_active].start ) {
+        for (index = 0; index < service->srv_ring_length; index++)
+                if ( service->srv_buf[index] == ev->mem_desc.start) 
+                        break;
+
+        if (index == service->srv_ring_length)
                 LBUG();
-        }
 
-        service->srv_ref_count[service->srv_md_active]++;
-        CDEBUG(D_INODE, "event offset %d buf size %d\n", 
-               ev->offset, service->srv_buf_size);
-        if (ev->offset >= (service->srv_buf_size - 1024)) {
-                CDEBUG(D_INODE, "Unlinking ME %d\n", service->srv_md_active);
+        service->srv_ref_count[index]++;
 
-                rc = PtlMEUnlink(service->srv_me_h[service->srv_md_active]);
-                service->srv_me_h[service->srv_md_active] = 0;
+        if (ev->unlinked_me != -1) {
+                int idx;
 
-                if (rc != PTL_OK) {
-                        CERROR("PtlMEUnlink failed - DROPPING soon: %d\n", rc);
+                for (idx = 0; idx < service->srv_ring_length; idx++)
+                        if (service->srv_me_h[idx] == ev->unlinked_me)
+                                break;
+                if (idx == service->srv_ring_length)
                         LBUG();
-                        spin_unlock(&service->srv_lock); 
-                        return rc;
-                }
 
-                service->srv_md_active = (service->srv_md_active + 1) % 
-                        service->srv_ring_length;
+                CDEBUG(D_NET, "unlinked %d\n", idx); 
+                service->srv_me_h[idx] = 0; 
 
-                if (service->srv_me_h[service->srv_md_active] == 0) { 
-                        CERROR("All %d ring ME's are unlinked!\n",
-                               service->srv_ring_length);
-                        LBUG();
-                }
+                if (service->srv_ref_count[idx] == 0)
+                        ptlrpc_link_svc_me(service, idx); 
         }
 
         spin_unlock(&service->srv_lock); 
-        if (ev->type == PTL_EVENT_PUT) {
+        if (ev->type == PTL_EVENT_PUT)
                 wake_up(&service->srv_waitq);
-        } else {
+        else
                 CERROR("Unexpected event type: %d\n", ev->type);
-        }
 
         return 0;
 }
@@ -180,9 +160,8 @@ static int bulk_sink_callback(ptl_event_t *ev, void *data)
         }
 
         /* FIXME: This should happen unconditionally */
-        if (bulk->b_cb != NULL) {
+        if (bulk->b_cb != NULL)
                 OBD_FREE(bulk, sizeof(*bulk));
-        }
 
         EXIT;
         return 1;
index 10dd72d..9738531 100644 (file)
@@ -158,8 +158,9 @@ int ptlrpc_register_bulk(struct ptlrpc_bulk_desc *bulk)
 
         ENTRY;
 
-        rc = PtlMEPrepend(bulk->b_peer.peer_ni, bulk->b_portal, local_id,
-                          bulk->b_xid, 0, PTL_UNLINK, &bulk->b_me_h);
+        rc = PtlMEAttach(bulk->b_peer.peer_ni, bulk->b_portal, local_id,
+                          bulk->b_xid, 0, PTL_UNLINK, PTL_INS_AFTER, 
+                         &bulk->b_me_h);
         if (rc != PTL_OK) {
                 CERROR("PtlMEAttach failed: %d\n", rc);
                 LBUG();
@@ -293,8 +294,8 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
         local_id.pid = PTL_ID_ANY;
 
         //CERROR("sending req %d\n", request->rq_xid);
-        rc = PtlMEPrepend(peer->peer_ni, request->rq_reply_portal, local_id,
-                          request->rq_xid, 0, PTL_UNLINK,
+        rc = PtlMEAttach(peer->peer_ni, request->rq_reply_portal, local_id,
+                          request->rq_xid, 0, PTL_UNLINK, PTL_INS_AFTER,
                           &request->rq_reply_me_h);
         if (rc != PTL_OK) {
                 CERROR("PtlMEAttach failed: %d\n", rc);
@@ -334,21 +335,55 @@ int ptl_send_rpc(struct ptlrpc_request *request, struct lustre_peer *peer)
         return rc;
 }
 
+void ptlrpc_link_svc_me(struct ptlrpc_service *service, int i)
+{
+        int rc;
+        ptl_md_t dummy;
+        ptl_handle_md_t md_h;
+
+        /* Attach the leading ME on which we build the ring */
+        rc = PtlMEAttach(service->srv_self.peer_ni, service->srv_req_portal,
+                         service->srv_id, 0, ~0, PTL_RETAIN, PTL_INS_BEFORE,
+                         &(service->srv_me_h[i]));
+        if (rc != PTL_OK) {
+                CERROR("PtlMEAttach failed: %d\n", rc);
+                LBUG();
+        }
+        
+        if (service->srv_ref_count[i])
+                LBUG();
+
+        dummy.start         = service->srv_buf[i];
+        dummy.length        = service->srv_buf_size;
+        dummy.max_offset    = service->srv_buf_size;
+        dummy.threshold     = PTL_MD_THRESH_INF;
+        dummy.options       = PTL_MD_OP_PUT | PTL_MD_AUTO_UNLINK;
+        dummy.user_ptr      = service;
+        dummy.eventq        = service->srv_eq_h;
+        dummy.max_offset    = service->srv_buf_size;
+        
+        rc = PtlMDAttach(service->srv_me_h[i], dummy, PTL_UNLINK, &md_h);
+        if (rc != PTL_OK) {
+                /* cleanup */
+                CERROR("PtlMDAttach failed: %d\n", rc);
+                LBUG();
+        }
+}        
+
 /* ptl_handled_rpc() should be called by the sleeping process once
  * it finishes processing an event.  This ensures the ref count is
  * decremented and that the rpc ring buffer cycles properly.
  */ 
 int ptl_handled_rpc(struct ptlrpc_service *service, void *start) 
 {
-        int rc, index = 0;
+        int index;
 
         spin_lock(&service->srv_lock);
 
-        while (index < service->srv_ring_length) {
-                if ( service->srv_md[index].start == start) 
+        for (index = 0; index < service->srv_ring_length; index++)
+                if (service->srv_buf[index] == start) 
                         break;
-                index++;
-        }
+
         if (index == service->srv_ring_length)
                 LBUG();
 
@@ -360,44 +395,10 @@ int ptl_handled_rpc(struct ptlrpc_service *service, void *start)
                 LBUG();
         
         if (service->srv_ref_count[index] == 0 &&
-            service->srv_me_h[index] == 0) {
-
-                /* Replace the unlinked ME and MD */
-                rc = PtlMEInsert(service->srv_me_h[service->srv_me_tail],
-                                 service->srv_id, 0, ~0, PTL_RETAIN,
-                                 PTL_INS_AFTER, &(service->srv_me_h[index]));
-                if (rc != PTL_OK) {
-                        CERROR("PtlMEInsert failed: %d\n", rc);
-                        LBUG();
-                        spin_unlock(&service->srv_lock);
-                        return rc;
-                }
-                CDEBUG(D_NET, "Inserting new ME and MD in ring, rc %d\n", rc);
-
-                service->srv_me_tail = index;
-
-                service->srv_md[index].start        = service->srv_buf[index];
-                service->srv_md[index].length       = service->srv_buf_size;
-                service->srv_md[index].threshold    = PTL_MD_THRESH_INF;
-                service->srv_md[index].options      = PTL_MD_OP_PUT;
-                service->srv_md[index].user_ptr     = service;
-                service->srv_md[index].eventq       = service->srv_eq_h;
-
-                rc = PtlMDAttach(service->srv_me_h[index],
-                                 service->srv_md[index],
-                                 PTL_RETAIN, &(service->srv_md_h[index]));
-                //CERROR("MDAttach (request MDs): %Lu\n",
-                //(__u64)(service->srv_md_h[index]));
-
-                CDEBUG(D_INFO, "Attach MD in ring, rc %d\n", rc);
-                if (rc != PTL_OK) {
-                        /* XXX cleanup */
-                        CERROR("PtlMDAttach failed: %d\n", rc);
-                        LBUG();
-                        spin_unlock(&service->srv_lock);
-                        return rc;
-                }
-        } 
+            service->srv_me_h[index] == 0)  {
+                CDEBUG(D_NET, "relinking %d\n", index); 
+                ptlrpc_link_svc_me(service, index); 
+        }
         
         spin_unlock(&service->srv_lock);
         return 0;
index 83538e0..1612597 100644 (file)
@@ -98,36 +98,67 @@ ptlrpc_init_svc(__u32 bufsize, int req_portal, int rep_portal, char *uuid,
                 svc_handler_t handler)
 {
         int err;
-        struct ptlrpc_service *svc;
+        int rc, i;
+        struct ptlrpc_service *service;
 
-        OBD_ALLOC(svc, sizeof(*svc)); 
-        if ( !svc ) { 
+        OBD_ALLOC(service, sizeof(*service)); 
+        if ( !service ) { 
                 CERROR("no memory\n");
+                LBUG();
                 return NULL;
         }
 
-        memset(svc, 0, sizeof(*svc)); 
+        memset(service, 0, sizeof(*service)); 
 
-        spin_lock_init(&svc->srv_lock);
-        INIT_LIST_HEAD(&svc->srv_reqs);
-        init_waitqueue_head(&svc->srv_ctl_waitq); 
-        init_waitqueue_head(&svc->srv_waitq); 
+        spin_lock_init(&service->srv_lock);
+        INIT_LIST_HEAD(&service->srv_reqs);
+        init_waitqueue_head(&service->srv_ctl_waitq); 
+        init_waitqueue_head(&service->srv_waitq); 
 
-        svc->srv_thread = NULL;
-        svc->srv_flags = 0;
+        service->srv_thread = NULL;
+        service->srv_flags = 0;
 
-        svc->srv_buf_size = bufsize;
-        svc->srv_rep_portal = rep_portal;
-        svc->srv_req_portal = req_portal;
+        service->srv_buf_size = bufsize;
+        service->srv_rep_portal = rep_portal;
+        service->srv_req_portal = req_portal;
+        service->srv_handler = handler;
 
-        svc->srv_handler = handler;
-        err = kportal_uuid_to_peer(uuid, &svc->srv_self);
+        err = kportal_uuid_to_peer(uuid, &service->srv_self);
         if (err) { 
                 CERROR("cannot get peer for uuid %s", uuid); 
-                OBD_FREE(svc, sizeof(*svc));
+                OBD_FREE(service, sizeof(*service));
                 return NULL; 
         }
-        return svc;
+
+        service->srv_ring_length = RPC_RING_LENGTH;
+        service->srv_id.nid = PTL_ID_ANY;
+        service->srv_id.pid = PTL_ID_ANY;
+
+        rc = PtlEQAlloc(service->srv_self.peer_ni, 128, 
+                        server_request_callback,
+                        service, &(service->srv_eq_h));
+
+        if (rc != PTL_OK) {
+                CERROR("PtlEQAlloc failed: %d\n", rc);
+                LBUG();
+                return NULL;
+        }
+
+        for (i = 0; i < service->srv_ring_length; i++) {
+                OBD_ALLOC(service->srv_buf[i], service->srv_buf_size);
+                if (service->srv_buf[i] == NULL) {
+                        CERROR("no memory\n");
+                        LBUG();
+                        return NULL;
+                }
+                service->srv_ref_count[i] = 0; 
+                ptlrpc_link_svc_me(service, i); 
+        }
+
+        CDEBUG(D_NET, "Starting service listening on portal %d\n",
+               service->srv_req_portal);
+
+        return service;
 }
 
 static int ptlrpc_main(void *arg)
@@ -261,97 +292,18 @@ int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
 }
 
 
-int rpc_register_service(struct ptlrpc_service *service, char *uuid)
-{
-        struct lustre_peer peer;
-        int rc, i;
-
-        rc = kportal_uuid_to_peer(uuid, &peer);
-        if (rc != 0) {
-                CERROR("Invalid uuid \"%s\"\n", uuid);
-                return -EINVAL;
-        }
-
-        service->srv_ring_length = RPC_RING_LENGTH;
-        service->srv_md_active = 0;
-
-        service->srv_id.nid = PTL_ID_ANY;
-        service->srv_id.pid = PTL_ID_ANY;
-
-        rc = PtlEQAlloc(peer.peer_ni, 128, server_request_callback,
-                        service, &(service->srv_eq_h));
-
-        if (rc != PTL_OK) {
-                CERROR("PtlEQAlloc failed: %d\n", rc);
-                return rc;
-        }
-
-        CDEBUG(D_NET, "Starting service listening on portal %d\n",
-               service->srv_req_portal);
-
-        /* Attach the leading ME on which we build the ring */
-        rc = PtlMEAttach(peer.peer_ni, service->srv_req_portal,
-                         service->srv_id, 0, ~0, PTL_RETAIN,
-                         &(service->srv_me_h[0]));
-
-        if (rc != PTL_OK) {
-                CERROR("PtlMEAttach failed: %d\n", rc);
-                return rc;
-        }
-
-        for (i = 0; i < service->srv_ring_length; i++) {
-                OBD_ALLOC(service->srv_buf[i], service->srv_buf_size);
-
-                if (service->srv_buf[i] == NULL) {
-                        CERROR("no memory\n");
-                        return -ENOMEM;
-                }
-
-                /* Insert additional ME's to the ring */
-                if (i > 0) {
-                        rc = PtlMEInsert(service->srv_me_h[i - 1],
-                                         service->srv_id, 0, ~0, PTL_RETAIN,
-                                         PTL_INS_AFTER,
-                                         &(service->srv_me_h[i]));
-                        service->srv_me_tail = i;
-
-                        if (rc != PTL_OK) {
-                                CERROR("PtlMEInsert failed: %d\n", rc);
-                                return rc;
-                        }
-                }
-
-                service->srv_ref_count[i] = 0;
-                service->srv_md[i].start         = service->srv_buf[i];
-                service->srv_md[i].length        = service->srv_buf_size;
-                service->srv_md[i].max_offset    = service->srv_buf_size;
-                service->srv_md[i].threshold     = PTL_MD_THRESH_INF;
-                service->srv_md[i].options       = PTL_MD_OP_PUT;
-                service->srv_md[i].user_ptr      = service;
-                service->srv_md[i].eventq        = service->srv_eq_h;
-                service->srv_md[i].max_offset    = service->srv_buf_size;
-
-                rc = PtlMDAttach(service->srv_me_h[i], service->srv_md[i],
-                                 PTL_RETAIN, &(service->srv_md_h[i]));
-
-                if (rc != PTL_OK) {
-                        /* cleanup */
-                        CERROR("PtlMDAttach failed: %d\n", rc);
-                        return rc;
-                }
-        }
-
-        return 0;
-}
 
 int rpc_unregister_service(struct ptlrpc_service *service)
 {
         int rc, i;
 
         for (i = 0; i < service->srv_ring_length; i++) {
-                rc = PtlMEUnlink(service->srv_me_h[i]);
-                if (rc)
-                        CERROR("PtlMEUnlink failed: %d\n", rc);
+                if (service->srv_me_h[i]) { 
+                        rc = PtlMEUnlink(service->srv_me_h[i]);
+                        if (rc)
+                                CERROR("PtlMEUnlink failed: %d\n", rc);
+                        service->srv_me_h[i] = 0;
+                }
 
                 if (service->srv_buf[i] != NULL)
                         OBD_FREE(service->srv_buf[i], service->srv_buf_size);