Whamcloud - gitweb
Land b_bug2503 onto HEAD (20040127_1701)
authoradilger <adilger>
Wed, 28 Jan 2004 00:25:08 +0000 (00:25 +0000)
committeradilger <adilger>
Wed, 28 Jan 2004 00:25:08 +0000 (00:25 +0000)
b=2503
r=zab,shaver

lustre/include/linux/lustre_net.h
lustre/include/linux/obd.h
lustre/include/linux/obd_class.h
lustre/obdclass/genops.c
lustre/osc/osc_internal.h
lustre/osc/osc_request.c
lustre/ptlrpc/client.c
lustre/ptlrpc/ptlrpc_module.c
lustre/ptlrpc/ptlrpcd.c

index 9d50fda..c98a831 100644 (file)
@@ -479,6 +479,7 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set);
 int ptlrpc_set_wait(struct ptlrpc_request_set *);
 int ptlrpc_expired_set(void *data);
 void ptlrpc_interrupted_set(void *data);
+void ptlrpc_mark_interrupted(struct ptlrpc_request *req);
 void ptlrpc_set_destroy(struct ptlrpc_request_set *);
 void ptlrpc_set_add_req(struct ptlrpc_request_set *, struct ptlrpc_request *);
 void ptlrpc_set_add_new_req(struct ptlrpc_request_set *,
@@ -552,6 +553,7 @@ int ptlrpc_pinger_add_import(struct obd_import *imp);
 int ptlrpc_pinger_del_import(struct obd_import *imp);
 
 /* ptlrpc/ptlrpcd.c */
+void ptlrpcd_wake(void);
 void ptlrpcd_add_req(struct ptlrpc_request *req);
 int ptlrpcd_addref(void);
 void ptlrpcd_decref(void);
index e3dedb4..3982a4c 100644 (file)
@@ -111,14 +111,28 @@ struct obd_async_page_ops {
         void (*ap_completion)(void *data, int cmd, int rc);
 };
 
+/* the `osic' is passed down from a caller of obd rw methods.  the callee
+ * records enough state such that the caller can sleep on the osic and
+ * be woken when all the callees have finished their work */
 struct obd_sync_io_container {
         spinlock_t      osic_lock;
         atomic_t        osic_refcount;
         int             osic_pending;
         int             osic_rc;
+        struct list_head osic_occ_list;
         wait_queue_head_t osic_waitq;
 };
 
+/* the osic callback context lets the callee of obd rw methods register
+ * for callbacks from the caller. */
+struct osic_callback_context {
+        struct list_head occ_osic_item;
+        /* called when the caller has received a signal while sleeping.
+         * callees of this method are encouraged to abort their state 
+         * in the osic.  This may be called multiple times. */
+        void (*occ_interrupted)(struct osic_callback_context *occ);
+};
+
 /* if we find more consumers this could be generalized */
 #define OBD_HIST_MAX 32
 struct obd_histogram {
index c9dd7ba..de61f92 100644 (file)
@@ -67,8 +67,10 @@ struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
                                           struct obd_uuid *grp_uuid);
 
 void osic_init(struct obd_sync_io_container **osic);
-void osic_add_one(struct obd_sync_io_container *osic);
-void osic_complete_one(struct obd_sync_io_container *osic, int rc);
+void osic_add_one(struct obd_sync_io_container *osic,
+                  struct osic_callback_context *occ);
+void osic_complete_one(struct obd_sync_io_container *osic, 
+                       struct osic_callback_context *occ, int rc);
 void osic_release(struct obd_sync_io_container *osic);
 int osic_wait(struct obd_sync_io_container *osic);
 
index dc524cf..0c86eac 100644 (file)
@@ -631,6 +631,7 @@ void osic_init(struct obd_sync_io_container **osic_out)
         osic->osic_pending = 0;
         atomic_set(&osic->osic_refcount, 1);
         init_waitqueue_head(&osic->osic_waitq);
+        INIT_LIST_HEAD(&osic->osic_occ_list);
         *osic_out = osic;
 };
 
@@ -644,29 +645,40 @@ void osic_release(struct obd_sync_io_container *osic)
                 OBD_FREE(osic, sizeof(*osic));
 }
 
-void osic_add_one(struct obd_sync_io_container *osic)
+void osic_add_one(struct obd_sync_io_container *osic,
+                  struct osic_callback_context *occ)
 {
         unsigned long flags;
         CDEBUG(D_CACHE, "osic %p ready to roll\n", osic);
         spin_lock_irqsave(&osic->osic_lock, flags);
         osic->osic_pending++;
+        if (occ != NULL)
+                list_add_tail(&occ->occ_osic_item, &osic->osic_occ_list);
         spin_unlock_irqrestore(&osic->osic_lock, flags);
         osic_grab(osic);
 }
 
-void osic_complete_one(struct obd_sync_io_container *osic, int rc)
+void osic_complete_one(struct obd_sync_io_container *osic, 
+                       struct osic_callback_context *occ, int rc)
 {
         unsigned long flags;
         wait_queue_head_t *wake = NULL; 
         int old_rc;
 
         spin_lock_irqsave(&osic->osic_lock, flags);
+
+        if (occ != NULL)
+                list_del_init(&occ->occ_osic_item);
+
         old_rc = osic->osic_rc;
         if (osic->osic_rc == 0 && rc != 0)
                 osic->osic_rc = rc;
+
         if (--osic->osic_pending <= 0)
                 wake = &osic->osic_waitq;
+
         spin_unlock_irqrestore(&osic->osic_lock, flags);
+
         CDEBUG(D_CACHE, "osic %p completed, rc %d -> %d via %d, %d now "
                         "pending (racey)\n", osic, old_rc, osic->osic_rc, rc, 
                         osic->osic_pending);
@@ -686,12 +698,42 @@ static int osic_done(struct obd_sync_io_container *osic)
         return rc;
 }
 
+static void interrupted_osic(void *data)
+{
+        struct obd_sync_io_container *osic = data;
+        struct list_head *pos;
+        struct osic_callback_context *occ;
+        unsigned long flags;
+
+        spin_lock_irqsave(&osic->osic_lock, flags);
+        list_for_each(pos, &osic->osic_occ_list) {
+                occ = list_entry(pos, struct osic_callback_context, 
+                                 occ_osic_item);
+                occ->occ_interrupted(occ);
+        }
+        spin_unlock_irqrestore(&osic->osic_lock, flags);
+}
+
 int osic_wait(struct obd_sync_io_container *osic)
 {
-        struct l_wait_info lwi = LWI_INTR(NULL, NULL);
+        struct l_wait_info lwi = LWI_INTR(interrupted_osic, osic);
+        int rc;
 
         CDEBUG(D_CACHE, "waiting for osic %p\n", osic);
-        l_wait_event(osic->osic_waitq, osic_done(osic), &lwi);
+
+        do {
+                rc = l_wait_event(osic->osic_waitq, osic_done(osic), &lwi);
+                LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
+                /* we can't continue until the osic has emptied and stopped
+                 * referencing state that the caller will free upon return */
+                if (rc == -EINTR)
+                        lwi = (struct l_wait_info){ 0, };
+        } while (rc == -EINTR);
+
+        LASSERTF(osic->osic_pending == 0, 
+                 "exiting osic_wait(osic = %p) with %d pending\n", osic,
+                 osic->osic_pending);
+
         CDEBUG(D_CACHE, "done waiting on osic %p\n", osic);
         return osic->osic_rc;
 }
index f72ea52..d78c8bf 100644 (file)
@@ -33,7 +33,12 @@ struct osc_async_page {
         obd_flag                oap_brw_flags;
         enum async_flags        oap_async_flags;
 
+        unsigned long           oap_interrupted:1;
         struct obd_sync_io_container *oap_osic;
+        struct osic_callback_context oap_occ;
+        struct ptlrpc_request   *oap_request;
+        struct client_obd       *oap_cli;
+        struct lov_oinfo        *oap_loi;
 
        struct obd_async_page_ops *oap_caller_ops;
         void                   *oap_caller_data;
index 39b439c..d33939e 100644 (file)
@@ -1090,15 +1090,74 @@ static int osc_brw_async(int cmd, struct obd_export *exp, struct obdo *oa,
 static void osc_check_rpcs(struct client_obd *cli);
 static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap);
 static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi);
+static void lop_update_pending(struct client_obd *cli,
+                               struct loi_oap_pages *lop, int cmd, int delta);
+
+/* this is called when a sync waiter receives an interruption.  Its job is to
+ * get the caller woken as soon as possible.  If its page hasn't been put in an
+ * rpc yet it can dequeue immediately.  Otherwise it has to mark the rpc as
+ * desiring interruption which will forcefully complete the rpc once the rpc
+ * has timed out */
+static void osc_occ_interrupted(struct osic_callback_context *occ)
+{
+        struct osc_async_page *oap;
+        struct loi_oap_pages *lop;
+        struct lov_oinfo *loi;
+        ENTRY;
 
+        /* XXX member_of() */
+        oap = list_entry(occ, struct osc_async_page, oap_occ);
+
+        spin_lock(&oap->oap_cli->cl_loi_list_lock);
+
+        oap->oap_interrupted = 1;
+
+        /* ok, it's been put in an rpc. */
+        if (oap->oap_request != NULL) {
+                ptlrpc_mark_interrupted(oap->oap_request);
+                ptlrpcd_wake();
+                GOTO(unlock, 0);
+        }
+
+        /* we don't get interruption callbacks until osc_trigger_sync_io()
+         * has been called and put the sync oaps in the pending/urgent lists.*/
+        if (!list_empty(&oap->oap_pending_item)) {
+                list_del_init(&oap->oap_pending_item);
+                if (oap->oap_async_flags & ASYNC_URGENT)
+                        list_del_init(&oap->oap_urgent_item);
+
+                loi = oap->oap_loi;
+                lop = (oap->oap_cmd == OBD_BRW_WRITE) ? 
+                        &loi->loi_write_lop : &loi->loi_read_lop;
+                lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
+                loi_list_maint(oap->oap_cli, oap->oap_loi);
+
+                osic_complete_one(oap->oap_osic, &oap->oap_occ, 0);
+                oap->oap_osic = NULL;
+
+        }
+
+unlock:
+        spin_unlock(&oap->oap_cli->cl_loi_list_lock);
+}
+
+/* this must be called holding the list lock to give coverage to exit_cache, 
+ * async_flag maintenance, and oap_request */
 static void osc_complete_oap(struct client_obd *cli,
                              struct osc_async_page *oap, int rc)
 {
         ENTRY;
         osc_exit_cache(cli, oap);
         oap->oap_async_flags = 0;
+        oap->oap_interrupted = 0;
+
+        if (oap->oap_request != NULL) {
+                ptlrpc_req_finished(oap->oap_request);
+                oap->oap_request = NULL;
+        }
+
         if (oap->oap_osic) {
-                osic_complete_one(oap->oap_osic, rc);
+                osic_complete_one(oap->oap_osic, &oap->oap_occ, rc);
                 oap->oap_osic = NULL;
                 EXIT;
                 return;
@@ -1339,6 +1398,16 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
                         oap = list_entry(pos, struct osc_async_page,
                                          oap_rpc_item);
                         list_del_init(&oap->oap_rpc_item);
+
+                        /* queued sync pages can be torn down while the pages
+                         * were between the pending list and the rpc */
+                        if (oap->oap_interrupted) {
+                                CDEBUG(D_INODE, "oap %p interrupted\n", oap);
+                                osc_complete_oap(cli, oap, oap->oap_count);
+                                continue;
+                        }
+
+                        /* put the page back in the loi/lop lists */
                         list_add_tail(&oap->oap_pending_item,
                                       &lop->lop_pending);
                         lop_update_pending(cli, lop, cmd, 1);
@@ -1356,22 +1425,34 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
         list_splice(&rpc_list, &aa->aa_oaps);
         INIT_LIST_HEAD(&rpc_list);
 
-        if (cmd == OBD_BRW_READ)
+        if (cmd == OBD_BRW_READ) {
                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
-        else 
-                lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
-
-        spin_lock(&cli->cl_loi_list_lock);
-        if (cmd == OBD_BRW_READ)
                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_brw_in_flight);
-        else 
+        } else {
+                lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, 
                                  cli->cl_brw_in_flight);
+        }
+
+        spin_lock(&cli->cl_loi_list_lock);
 
         cli->cl_brw_in_flight++;
+        /* queued sync pages can be torn down while the pages
+         * were between the pending list and the rpc */
+        list_for_each(pos, &aa->aa_oaps) {
+                oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
+                if (oap->oap_interrupted) {
+                        CDEBUG(D_INODE, "oap %p in req %p interrupted\n", 
+                               oap, request);
+                        ptlrpc_mark_interrupted(request);
+                        break;
+                }
+        }
+
         CDEBUG(D_INODE, "req %p: %d pages, aa %p.  now %d in flight\n", request,
                page_count, aa, cli->cl_brw_in_flight);
 
+        oap->oap_request = ptlrpc_request_addref(request);
         request->rq_interpret_reply = brw_interpret_oap;
         ptlrpcd_add_req(request);
         RETURN(1);
@@ -1628,6 +1709,9 @@ int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
                 return -ENOMEM;
 
         oap->oap_magic = OAP_MAGIC;
+        oap->oap_cli = &exp->exp_obd->u.cli;
+        oap->oap_loi = loi;
+
         oap->oap_caller_ops = ops;
         oap->oap_caller_data = data;
 
@@ -1638,6 +1722,8 @@ int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
         INIT_LIST_HEAD(&oap->oap_urgent_item);
         INIT_LIST_HEAD(&oap->oap_rpc_item);
 
+        oap->oap_occ.occ_interrupted = osc_occ_interrupted;
+
         CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
         *res = oap;
         RETURN(0);
@@ -1808,7 +1894,7 @@ static int osc_queue_sync_io(struct obd_export *exp, struct lov_stripe_md *lsm,
 
         list_add_tail(&oap->oap_pending_item, &lop->lop_pending_sync);
         oap->oap_osic = osic;
-        osic_add_one(osic);
+        osic_add_one(osic, &oap->oap_occ);
 
         LOI_DEBUG(loi, "oap %p page %p on sync pending\n", oap, oap->oap_page);
 
@@ -1885,11 +1971,11 @@ static int osc_teardown_async_page(struct obd_export *exp,
 
         spin_lock(&cli->cl_loi_list_lock);
 
-        osc_exit_cache(cli, oap);
-
         if (!list_empty(&oap->oap_rpc_item))
                 GOTO(out, rc = -EBUSY);
 
+        osc_exit_cache(cli, oap);
+
         if (!list_empty(&oap->oap_urgent_item)) {
                 list_del_init(&oap->oap_urgent_item);
                 oap->oap_async_flags &= ~ASYNC_URGENT;
@@ -1903,7 +1989,8 @@ static int osc_teardown_async_page(struct obd_export *exp,
         LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
 out:
         spin_unlock(&cli->cl_loi_list_lock);
-        OBD_FREE(oap, sizeof(*oap));
+        if (rc == 0)
+                OBD_FREE(oap, sizeof(*oap));
         RETURN(rc);
 }
 
index 57773f6..6f3ae1b 100644 (file)
@@ -662,7 +662,12 @@ int ptlrpc_check_set(struct ptlrpc_request_set *set)
                         GOTO(interpret, req->rq_status);
                 }
 
-                if (req->rq_intr) {
+                /* ptlrpc_queue_wait->l_wait_event guarantees that rq_intr
+                 * will only be set after rq_timedout, but the osic waiting
+                 * path sets rq_intr irrespective of whether ptlrpcd has
+                 * seen a timeout.  our policy is to only interpret 
+                 * interrupted rpcs after they have timed out */
+                if (req->rq_intr && (req->rq_timedout || req->rq_waiting)) {
                         /* NB could be on delayed list */
                         ptlrpc_unregister_reply(req);
                         req->rq_status = -EINTR;
@@ -870,11 +875,18 @@ int ptlrpc_expired_set(void *data)
         RETURN(1);
 }
 
+void ptlrpc_mark_interrupted(struct ptlrpc_request *req)
+{
+        unsigned long flags;
+        spin_lock_irqsave(&req->rq_lock, flags);
+        req->rq_intr = 1;
+        spin_unlock_irqrestore(&req->rq_lock, flags);
+}
+
 void ptlrpc_interrupted_set(void *data)
 {
         struct ptlrpc_request_set *set = data;
         struct list_head *tmp;
-        unsigned long flags;
 
         LASSERT(set != NULL);
         CERROR("INTERRUPTED SET %p\n", set);
@@ -886,9 +898,7 @@ void ptlrpc_interrupted_set(void *data)
                 if (req->rq_phase != RQ_PHASE_RPC)
                         continue;
 
-                spin_lock_irqsave (&req->rq_lock, flags);
-                req->rq_intr = 1;
-                spin_unlock_irqrestore (&req->rq_lock, flags);
+                ptlrpc_mark_interrupted(req);
         }
 }
 
index 492d108..95750b2 100644 (file)
@@ -126,6 +126,7 @@ EXPORT_SYMBOL(ptlrpc_check_set);
 EXPORT_SYMBOL(ptlrpc_set_wait);
 EXPORT_SYMBOL(ptlrpc_expired_set);
 EXPORT_SYMBOL(ptlrpc_interrupted_set);
+EXPORT_SYMBOL(ptlrpc_mark_interrupted);
 
 /* service.c */
 EXPORT_SYMBOL(ptlrpc_init_svc);
@@ -192,6 +193,7 @@ EXPORT_SYMBOL(ptlrpc_pinger_sending_on_import);
 EXPORT_SYMBOL(ptlrpcd_addref);
 EXPORT_SYMBOL(ptlrpcd_decref);
 EXPORT_SYMBOL(ptlrpcd_add_req);
+EXPORT_SYMBOL(ptlrpcd_wake);
 
 /* lproc_ptlrpc.c */
 EXPORT_SYMBOL(ptlrpc_lprocfs_register_obd);
index b7b9700..aff00b8 100644 (file)
@@ -67,12 +67,18 @@ static struct ptlrpcd_ctl {
 static DECLARE_MUTEX(ptlrpcd_sem);
 static int ptlrpcd_users = 0;
 
+void ptlrpcd_wake(void)
+{
+        struct ptlrpcd_ctl *pc = &ptlrpcd_pc;
+        wake_up(&pc->pc_waitq);
+}
+
 void ptlrpcd_add_req(struct ptlrpc_request *req)
 {
         struct ptlrpcd_ctl *pc = &ptlrpcd_pc;
 
         ptlrpc_set_add_new_req(pc->pc_set, req);
-        wake_up(&pc->pc_waitq);
+        ptlrpcd_wake();
 }
 
 static int ptlrpcd_check(struct ptlrpcd_ctl *pc)