Whamcloud - gitweb
- added a 'dying' head to fix very bad bug in yesterday's request code
authorpschwan <pschwan>
Tue, 30 Apr 2002 22:08:05 +0000 (22:08 +0000)
committerpschwan <pschwan>
Tue, 30 Apr 2002 22:08:05 +0000 (22:08 +0000)
- removed request->rq_lock (never used)
- made a ptlrpc_thread structure, and a list of those in ptlrpc_service
- adapted service code to support multithreading
- removed service->srv_id (duplicated existing local_id)
- updated llecho

lustre/include/linux/lustre_net.h
lustre/ldlm/ldlm_lockd.c
lustre/mds/handler.c
lustre/ost/ost_handler.c
lustre/ptlrpc/client.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/rpc.c
lustre/ptlrpc/service.c
lustre/tests/common.sh
lustre/tests/llecho.sh

index 944c29a..f8132e8 100644 (file)
@@ -111,7 +111,8 @@ struct ptlrpc_client {
         struct list_head cli_sent_head;
         struct list_head cli_replied_head;
         struct list_head cli_replay_head;
-        struct list_head cli_ha_item; 
+        struct list_head cli_dying_head;
+        struct list_head cli_ha_item;
         void (*cli_recover)(struct ptlrpc_client *); 
 
         struct recovd_obd *cli_recovd;
@@ -136,7 +137,6 @@ struct ptlrpc_client {
 
 struct ptlrpc_request { 
         int rq_type; /* one of PTL_RPC_REQUEST, PTL_RPC_REPLY, PTL_RPC_BULK */
-        spinlock_t rq_lock;
         struct list_head rq_list;
         struct obd_device *rq_obd;
         int rq_status;
@@ -190,6 +190,13 @@ struct ptlrpc_bulk_desc {
         ptl_handle_me_t b_me_h;
 };
 
+struct ptlrpc_thread {
+        struct list_head t_link;
+
+        __u32 t_flags; 
+        wait_queue_head_t t_ctl_waitq;
+};
+
 struct ptlrpc_service {
         time_t srv_time;
         time_t srv_timeout;
@@ -209,16 +216,13 @@ struct ptlrpc_service {
         /* event queue */
         ptl_handle_eq_t srv_eq_h;
 
-        __u32 srv_flags; 
         struct lustre_peer srv_self;
-        ptl_process_id_t srv_id;
 
-        struct task_struct *srv_thread;
-        wait_queue_head_t srv_waitq;
-        wait_queue_head_t srv_ctl_waitq;
+        wait_queue_head_t srv_waitq; /* all threads sleep on this */
 
         spinlock_t srv_lock;
         struct list_head srv_reqs;
+        struct list_head srv_threads;
         int (*srv_handler)(struct obd_device *obddev, 
                            struct ptlrpc_service *svc,
                            struct ptlrpc_request *req);
@@ -268,7 +272,7 @@ int ptlrpc_check_status(struct ptlrpc_request *req, int err);
 struct ptlrpc_service *
 ptlrpc_init_svc(__u32 bufsize, int req_portal, int rep_portal, char *uuid,
                 svc_handler_t);
-void ptlrpc_stop_thread(struct ptlrpc_service *svc);
+void ptlrpc_stop_all_threads(struct ptlrpc_service *svc);
 int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
                         char *name);
 int rpc_unregister_service(struct ptlrpc_service *service);
@@ -276,6 +280,7 @@ int rpc_unregister_service(struct ptlrpc_service *service);
 struct ptlrpc_svc_data { 
         char *name;
         struct ptlrpc_service *svc; 
+        struct ptlrpc_thread *thread;
         struct obd_device *dev;
 }; 
 
index fdf556b..730d606 100644 (file)
@@ -374,7 +374,7 @@ static int ldlm_cleanup(struct obd_device *obddev)
         struct ldlm_obd *ldlm = &obddev->u.ldlm;
         ENTRY;
 
-        ptlrpc_stop_thread(ldlm->ldlm_service);
+        ptlrpc_stop_all_threads(ldlm->ldlm_service);
         rpc_unregister_service(ldlm->ldlm_service);
 
         if (!list_empty(&ldlm->ldlm_service->srv_reqs)) {
index e8bf419..3e18d67 100644 (file)
@@ -926,7 +926,7 @@ static int mds_cleanup(struct obd_device * obddev)
                 RETURN(-EBUSY);
         }
 
-        ptlrpc_stop_thread(mds->mds_service);
+        ptlrpc_stop_all_threads(mds->mds_service);
         rpc_unregister_service(mds->mds_service);
         if (!list_empty(&mds->mds_service->srv_reqs)) {
                 // XXX reply with errors and clean up
index ac54fe6..ba71208 100644 (file)
@@ -598,6 +598,11 @@ static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
         err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
         if (err)
                 GOTO(error_disc, err = -EINVAL);
+#if 0
+        err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
+        if (err)
+                GOTO(error_disc, err = -EINVAL);
+#endif
 
         RETURN(0);
 
@@ -620,7 +625,7 @@ static int ost_cleanup(struct obd_device * obddev)
                 RETURN(-EBUSY);
         }
 
-        ptlrpc_stop_thread(ost->ost_service);
+        ptlrpc_stop_all_threads(ost->ost_service);
         rpc_unregister_service(ost->ost_service);
 
         if (!list_empty(&ost->ost_service->srv_reqs)) {
index e462b56..341b51c 100644 (file)
@@ -43,6 +43,7 @@ void ptlrpc_init_client(struct recovd_obd *recovd,
         INIT_LIST_HEAD(&cl->cli_sent_head);
         INIT_LIST_HEAD(&cl->cli_replied_head);
         INIT_LIST_HEAD(&cl->cli_replay_head);
+        INIT_LIST_HEAD(&cl->cli_dying_head);
         spin_lock_init(&cl->cli_lock);
         sema_init(&cl->cli_rpc_sem, 32);
 }
@@ -284,8 +285,8 @@ void ptlrpc_free_committed(struct ptlrpc_client *cli)
                         break; 
 
                 /* retain for replay if flagged */
+                list_del(&req->rq_list);
                 if (req->rq_flags & PTL_RPC_FL_RETAIN) {
-                        list_del(&req->rq_list); 
                         list_add(&req->rq_list, &cli->cli_replay_head);
                 } else {
                         CDEBUG(D_INFO, "Marking request %p as committed ("
@@ -293,6 +294,8 @@ void ptlrpc_free_committed(struct ptlrpc_client *cli)
                                req->rq_transno, cli->cli_last_committed);
                         if (atomic_dec_and_test(&req->rq_refcount))
                                 ptlrpc_free_req(req);
+                        else
+                                list_add(&req->rq_list, &cli->cli_dying_head);
                 }
         }
 
@@ -310,28 +313,35 @@ void ptlrpc_cleanup_client(struct ptlrpc_client *cli)
         list_for_each_safe(tmp, saved, &cli->cli_replied_head) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
                 /* We do this to prevent ptlrpc_free_req from taking cli_lock */
-                CDEBUG(D_INFO, "Cleaning req %p from replied head.\n", req);
+                CDEBUG(D_INFO, "Cleaning req %p from replied list.\n", req);
                 list_del(&req->rq_list);
                 req->rq_client = NULL;
                 ptlrpc_free_req(req); 
         }
         list_for_each_safe(tmp, saved, &cli->cli_sent_head) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
-                CDEBUG(D_INFO, "Cleaning req %p from sent head.\n", req);
+                CDEBUG(D_INFO, "Cleaning req %p from sent list.\n", req);
                 list_del(&req->rq_list);
                 req->rq_client = NULL;
                 ptlrpc_free_req(req); 
         }
         list_for_each_safe(tmp, saved, &cli->cli_replay_head) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
-                CERROR("Request %p is on the replay head at cleanup!\n", req);
+                CERROR("Request %p is on the replay list at cleanup!\n", req);
                 list_del(&req->rq_list);
                 req->rq_client = NULL;
                 ptlrpc_free_req(req); 
         }
         list_for_each_safe(tmp, saved, &cli->cli_sending_head) {
                 req = list_entry(tmp, struct ptlrpc_request, rq_list);
-                CDEBUG(D_INFO, "Cleaning req %p from sending head.\n", req);
+                CDEBUG(D_INFO, "Cleaning req %p from sending list.\n", req);
+                list_del(&req->rq_list);
+                req->rq_client = NULL;
+                ptlrpc_free_req(req); 
+        }
+        list_for_each_safe(tmp, saved, &cli->cli_dying_head) {
+                req = list_entry(tmp, struct ptlrpc_request, rq_list);
+                CERROR("Request %p is on the dying list at cleanup!\n", req);
                 list_del(&req->rq_list);
                 req->rq_client = NULL;
                 ptlrpc_free_req(req); 
index caf94a4..97ecc29 100644 (file)
@@ -323,7 +323,7 @@ void ptlrpc_link_svc_me(struct ptlrpc_service *service, int i)
 
         /* Attach the leading ME on which we build the ring */
         rc = PtlMEAttach(service->srv_self.peer_ni, service->srv_req_portal,
-                         service->srv_id, 0, ~0, PTL_RETAIN, PTL_INS_BEFORE,
+                         local_id, 0, ~0, PTL_RETAIN, PTL_INS_BEFORE,
                          &(service->srv_me_h[i]));
         if (rc != PTL_OK) {
                 CERROR("PtlMEAttach failed: %d\n", rc);
index 659845b..d21a480 100644 (file)
@@ -87,7 +87,7 @@ int connmgr_cleanup(struct obd_device *dev)
         if (err) 
                 LBUG();
 
-        ptlrpc_stop_thread(recovd->recovd_service);
+        ptlrpc_stop_all_threads(recovd->recovd_service);
         rpc_unregister_service(recovd->recovd_service);
         if (!list_empty(&recovd->recovd_service->srv_reqs)) {
                 // XXX reply with errors and clean up
index 46ea224..bd15d69 100644 (file)
@@ -28,7 +28,8 @@
 extern int request_in_callback(ptl_event_t *ev, void *data);
 extern int ptl_handled_rpc(struct ptlrpc_service *service, void *start);
 
-static int ptlrpc_check_event(struct ptlrpc_service *svc, ptl_event_t *event)
+static int ptlrpc_check_event(struct ptlrpc_service *svc,
+                              struct ptlrpc_thread *thread, ptl_event_t *event)
 {
         int rc = 0;
         ENTRY;
@@ -37,11 +38,11 @@ static int ptlrpc_check_event(struct ptlrpc_service *svc, ptl_event_t *event)
         if (sigismember(&(current->pending.signal), SIGKILL) ||
             sigismember(&(current->pending.signal), SIGTERM) ||
             sigismember(&(current->pending.signal), SIGINT)) {
-                svc->srv_flags |= SVC_KILLED;
+                thread->t_flags |= SVC_KILLED;
                 GOTO(out, rc = 1);
         }
 
-        if (svc->srv_flags & SVC_STOPPING)
+        if (thread->t_flags & SVC_STOPPING)
                 GOTO(out, rc = 1);
 
         if (ptl_is_valid_handle(&svc->srv_eq_h)) {
@@ -49,7 +50,7 @@ static int ptlrpc_check_event(struct ptlrpc_service *svc, ptl_event_t *event)
                 err = PtlEQGet(svc->srv_eq_h, event);
 
                 if (err == PTL_OK) {
-                        svc->srv_flags |= SVC_EVENT;
+                        thread->t_flags |= SVC_EVENT;
                         GOTO(out, rc = 1);
                 }
 
@@ -84,12 +85,9 @@ ptlrpc_init_svc(__u32 bufsize, int req_portal, int rep_portal, char *uuid,
 
         spin_lock_init(&service->srv_lock);
         INIT_LIST_HEAD(&service->srv_reqs);
-        init_waitqueue_head(&service->srv_ctl_waitq);
+        INIT_LIST_HEAD(&service->srv_threads);
         init_waitqueue_head(&service->srv_waitq);
 
-        service->srv_thread = NULL;
-        service->srv_flags = 0;
-
         service->srv_buf_size = bufsize;
         service->srv_rep_portal = rep_portal;
         service->srv_req_portal = req_portal;
@@ -102,8 +100,6 @@ ptlrpc_init_svc(__u32 bufsize, int req_portal, int rep_portal, char *uuid,
         }
 
         service->srv_ring_length = RPC_RING_LENGTH;
-        service->srv_id.nid = PTL_ID_ANY;
-        service->srv_id.pid = PTL_ID_ANY;
 
         rc = PtlEQAlloc(service->srv_self.peer_ni, 128, request_in_callback,
                         service, &(service->srv_eq_h));
@@ -191,8 +187,6 @@ static int handle_incoming_request(struct obd_device *obddev,
                         LBUG();
         }
 
-        svc->srv_flags &= ~SVC_EVENT;
-
         spin_unlock(&svc->srv_lock);
         rc = svc->srv_handler(obddev, svc, &request);
         ptlrpc_put_connection(request.rq_connection);
@@ -206,6 +200,7 @@ static int ptlrpc_main(void *arg)
         struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
         struct obd_device *obddev = data->dev;
         struct ptlrpc_service *svc = data->svc;
+        struct ptlrpc_thread *thread = data->thread;
 
         ENTRY;
 
@@ -218,10 +213,9 @@ static int ptlrpc_main(void *arg)
 
         sprintf(current->comm, data->name);
 
-        /* Record that the  thread is running */
-        svc->srv_thread = current;
-        svc->srv_flags = SVC_RUNNING;
-        wake_up(&svc->srv_ctl_waitq);
+        /* Record that the thread is running */
+        thread->t_flags = SVC_RUNNING;
+        wake_up(&thread->t_ctl_waitq);
 
         /* XXX maintain a list of all managed devices: insert here */
 
@@ -229,26 +223,28 @@ static int ptlrpc_main(void *arg)
         while (1) {
                 ptl_event_t event;
 
-                wait_event(svc->srv_waitq, ptlrpc_check_event(svc, &event));
+                wait_event(svc->srv_waitq,
+                           ptlrpc_check_event(svc, thread, &event));
 
                 spin_lock(&svc->srv_lock);
-                if (svc->srv_flags & SVC_SIGNAL) {
-                        svc->srv_flags &= ~SVC_SIGNAL;
+                if (thread->t_flags & SVC_SIGNAL) {
+                        thread->t_flags &= ~SVC_SIGNAL;
                         spin_unlock(&svc->srv_lock);
                         EXIT;
                         break;
                 }
 
-                if (svc->srv_flags & SVC_STOPPING) {
-                        svc->srv_flags &= ~SVC_STOPPING;
+                if (thread->t_flags & SVC_STOPPING) {
+                        thread->t_flags &= ~SVC_STOPPING;
                         spin_unlock(&svc->srv_lock);
                         EXIT;
                         break;
                 }
                 
-                if (svc->srv_flags & SVC_EVENT) { 
-                        svc->srv_flags &= ~SVC_EVENT;
+                if (thread->t_flags & SVC_EVENT) { 
+                        thread->t_flags &= ~SVC_EVENT;
                         rc = handle_incoming_request(obddev, svc, &event);
+                        thread->t_flags &= ~SVC_EVENT;
                         continue;
                 }
 
@@ -258,43 +254,69 @@ static int ptlrpc_main(void *arg)
                 break;
         }
 
-        svc->srv_thread = NULL;
-        svc->srv_flags = SVC_STOPPED;
-        wake_up(&svc->srv_ctl_waitq);
-        CDEBUG(D_NET, "svc exiting process %d\n", current->pid);
+        thread->t_flags = SVC_STOPPED;
+        wake_up(&thread->t_ctl_waitq);
+        CDEBUG(D_NET, "service thread exiting, process %d\n", current->pid);
         return 0;
 }
 
-void ptlrpc_stop_thread(struct ptlrpc_service *svc)
+static void ptlrpc_stop_thread(struct ptlrpc_service *svc,
+                               struct ptlrpc_thread *thread)
 {
-        svc->srv_flags = SVC_STOPPING;
+        thread->t_flags = SVC_STOPPING;
 
         wake_up(&svc->srv_waitq);
-        wait_event_interruptible(svc->srv_ctl_waitq,
-                                 (svc->srv_flags & SVC_STOPPED));
+        wait_event_interruptible(thread->t_ctl_waitq,
+                                 (thread->t_flags & SVC_STOPPED));
+}
+
+void ptlrpc_stop_all_threads(struct ptlrpc_service *svc)
+{
+        spin_lock(&svc->srv_lock);
+        while (!list_empty(&svc->srv_threads)) {
+                struct ptlrpc_thread *thread;
+                thread = list_entry(svc->srv_threads.next, struct ptlrpc_thread,
+                                    t_link);
+                spin_unlock(&svc->srv_lock);
+                ptlrpc_stop_thread(svc, thread);
+                spin_lock(&svc->srv_lock);
+                list_del(&thread->t_link);
+                OBD_FREE(thread, sizeof(*thread));
+        }
+        spin_unlock(&svc->srv_lock);
 }
 
 int ptlrpc_start_thread(struct obd_device *dev, struct ptlrpc_service *svc,
-                                char *name)
+                        char *name)
 {
         struct ptlrpc_svc_data d;
+        struct ptlrpc_thread *thread;
         int rc;
         ENTRY;
 
+        OBD_ALLOC(thread, sizeof(*thread));
+        if (thread == NULL) {
+                LBUG();
+                RETURN(-ENOMEM);
+        }
+        init_waitqueue_head(&thread->t_ctl_waitq);
+
         d.dev = dev;
         d.svc = svc;
         d.name = name;
+        d.thread = thread;
 
-        init_waitqueue_head(&svc->srv_waitq);
+        spin_lock(&svc->srv_lock);
+        list_add(&thread->t_link, &svc->srv_threads);
+        spin_unlock(&svc->srv_lock);
 
-        init_waitqueue_head(&svc->srv_ctl_waitq);
         rc = kernel_thread(ptlrpc_main, (void *) &d,
                            CLONE_VM | CLONE_FS | CLONE_FILES);
         if (rc < 0) {
                 CERROR("cannot start thread\n");
                 RETURN(-EINVAL);
         }
-        wait_event(svc->srv_ctl_waitq, svc->srv_flags & SVC_RUNNING);
+        wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_RUNNING);
 
         RETURN(0);
 }
index 4bf1505..b787432 100644 (file)
@@ -92,7 +92,7 @@ old_fs () {
 
 list_mods() {
        $DBGCTL modules > $R/tmp/ogdb
-       echo "The GDB module script is in /tmp/ogdb"
+       echo "The GDB module script is in $R/tmp/ogdb"
        [ "$DEBUG_WAIT" = "yes" ] && echo -n "Press ENTER to continue" && read
 }
 
index e0a2328..950ef71 100644 (file)
@@ -12,13 +12,13 @@ setup_portals
 setup_lustre
 
 $OBDCTL <<EOF
-device 0
+newdev
 attach obdecho OBDDEV
 setup
-device 1
+newdev
 attach ost OSTDEV
-setup 0
-device 2
+setup \$OBDDEV
+newdev
 attach osc OSCDEV
 setup -1
 quit