int ptlrpc_set_wait(struct ptlrpc_request_set *);
int ptlrpc_expired_set(void *data);
void ptlrpc_interrupted_set(void *data);
+void ptlrpc_mark_interrupted(struct ptlrpc_request *req);
void ptlrpc_set_destroy(struct ptlrpc_request_set *);
void ptlrpc_set_add_req(struct ptlrpc_request_set *, struct ptlrpc_request *);
void ptlrpc_set_add_new_req(struct ptlrpc_request_set *,
int ptlrpc_pinger_del_import(struct obd_import *imp);
/* ptlrpc/ptlrpcd.c */
+void ptlrpcd_wake(void);
void ptlrpcd_add_req(struct ptlrpc_request *req);
int ptlrpcd_addref(void);
void ptlrpcd_decref(void);
void (*ap_completion)(void *data, int cmd, int rc);
};
+/* the `osic' is passed down from a caller of obd rw methods. the callee
+ * records enough state such that the caller can sleep on the osic and
+ * be woken when all the callees have finished their work */
struct obd_sync_io_container {
spinlock_t osic_lock;
atomic_t osic_refcount;
int osic_pending;
int osic_rc;
+ struct list_head osic_occ_list;
wait_queue_head_t osic_waitq;
};
+/* the osic callback context lets the callee of obd rw methods register
+ * for callbacks from the caller. */
+struct osic_callback_context {
+ struct list_head occ_osic_item;
+ /* called when the caller has received a signal while sleeping.
+ * callees of this method are encouraged to abort their state
+ * in the osic. This may be called multiple times. */
+ void (*occ_interrupted)(struct osic_callback_context *occ);
+};
+
/* if we find more consumers this could be generalized */
#define OBD_HIST_MAX 32
struct obd_histogram {
struct obd_uuid *grp_uuid);
void osic_init(struct obd_sync_io_container **osic);
-void osic_add_one(struct obd_sync_io_container *osic);
-void osic_complete_one(struct obd_sync_io_container *osic, int rc);
+void osic_add_one(struct obd_sync_io_container *osic,
+ struct osic_callback_context *occ);
+void osic_complete_one(struct obd_sync_io_container *osic,
+ struct osic_callback_context *occ, int rc);
void osic_release(struct obd_sync_io_container *osic);
int osic_wait(struct obd_sync_io_container *osic);
osic->osic_pending = 0;
atomic_set(&osic->osic_refcount, 1);
init_waitqueue_head(&osic->osic_waitq);
+ INIT_LIST_HEAD(&osic->osic_occ_list);
*osic_out = osic;
};
OBD_FREE(osic, sizeof(*osic));
}
-void osic_add_one(struct obd_sync_io_container *osic)
+void osic_add_one(struct obd_sync_io_container *osic,
+ struct osic_callback_context *occ)
{
unsigned long flags;
CDEBUG(D_CACHE, "osic %p ready to roll\n", osic);
spin_lock_irqsave(&osic->osic_lock, flags);
osic->osic_pending++;
+ if (occ != NULL)
+ list_add_tail(&occ->occ_osic_item, &osic->osic_occ_list);
spin_unlock_irqrestore(&osic->osic_lock, flags);
osic_grab(osic);
}
-void osic_complete_one(struct obd_sync_io_container *osic, int rc)
+void osic_complete_one(struct obd_sync_io_container *osic,
+ struct osic_callback_context *occ, int rc)
{
unsigned long flags;
wait_queue_head_t *wake = NULL;
int old_rc;
spin_lock_irqsave(&osic->osic_lock, flags);
+
+ if (occ != NULL)
+ list_del_init(&occ->occ_osic_item);
+
old_rc = osic->osic_rc;
if (osic->osic_rc == 0 && rc != 0)
osic->osic_rc = rc;
+
if (--osic->osic_pending <= 0)
wake = &osic->osic_waitq;
+
spin_unlock_irqrestore(&osic->osic_lock, flags);
+
CDEBUG(D_CACHE, "osic %p completed, rc %d -> %d via %d, %d now "
"pending (racey)\n", osic, old_rc, osic->osic_rc, rc,
osic->osic_pending);
return rc;
}
+static void interrupted_osic(void *data)
+{
+ struct obd_sync_io_container *osic = data;
+ struct list_head *pos;
+ struct osic_callback_context *occ;
+ unsigned long flags;
+
+ spin_lock_irqsave(&osic->osic_lock, flags);
+ list_for_each(pos, &osic->osic_occ_list) {
+ occ = list_entry(pos, struct osic_callback_context,
+ occ_osic_item);
+ occ->occ_interrupted(occ);
+ }
+ spin_unlock_irqrestore(&osic->osic_lock, flags);
+}
+
int osic_wait(struct obd_sync_io_container *osic)
{
- struct l_wait_info lwi = LWI_INTR(NULL, NULL);
+ struct l_wait_info lwi = LWI_INTR(interrupted_osic, osic);
+ int rc;
CDEBUG(D_CACHE, "waiting for osic %p\n", osic);
- l_wait_event(osic->osic_waitq, osic_done(osic), &lwi);
+
+ do {
+ rc = l_wait_event(osic->osic_waitq, osic_done(osic), &lwi);
+ LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
+ /* we can't continue until the osic has emptied and stopped
+ * referencing state that the caller will free upon return */
+ if (rc == -EINTR)
+ lwi = (struct l_wait_info){ 0, };
+ } while (rc == -EINTR);
+
+ LASSERTF(osic->osic_pending == 0,
+ "exiting osic_wait(osic = %p) with %d pending\n", osic,
+ osic->osic_pending);
+
CDEBUG(D_CACHE, "done waiting on osic %p\n", osic);
return osic->osic_rc;
}
obd_flag oap_brw_flags;
enum async_flags oap_async_flags;
+ unsigned long oap_interrupted:1;
struct obd_sync_io_container *oap_osic;
+ struct osic_callback_context oap_occ;
+ struct ptlrpc_request *oap_request;
+ struct client_obd *oap_cli;
+ struct lov_oinfo *oap_loi;
struct obd_async_page_ops *oap_caller_ops;
void *oap_caller_data;
static void osc_check_rpcs(struct client_obd *cli);
static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap);
static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi);
+static void lop_update_pending(struct client_obd *cli,
+ struct loi_oap_pages *lop, int cmd, int delta);
+
+/* this is called when a sync waiter receives an interruption. Its job is to
+ * get the caller woken as soon as possible. If its page hasn't been put in an
+ * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
+ * desiring interruption which will forcefully complete the rpc once the rpc
+ * has timed out */
+static void osc_occ_interrupted(struct osic_callback_context *occ)
+{
+ struct osc_async_page *oap;
+ struct loi_oap_pages *lop;
+ struct lov_oinfo *loi;
+ ENTRY;
+ /* XXX member_of() */
+ oap = list_entry(occ, struct osc_async_page, oap_occ);
+
+ spin_lock(&oap->oap_cli->cl_loi_list_lock);
+
+ oap->oap_interrupted = 1;
+
+ /* ok, it's been put in an rpc. */
+ if (oap->oap_request != NULL) {
+ ptlrpc_mark_interrupted(oap->oap_request);
+ ptlrpcd_wake();
+ GOTO(unlock, 0);
+ }
+
+ /* we don't get interruption callbacks until osc_trigger_sync_io()
+ * has been called and put the sync oaps in the pending/urgent lists.*/
+ if (!list_empty(&oap->oap_pending_item)) {
+ list_del_init(&oap->oap_pending_item);
+ if (oap->oap_async_flags & ASYNC_URGENT)
+ list_del_init(&oap->oap_urgent_item);
+
+ loi = oap->oap_loi;
+ lop = (oap->oap_cmd == OBD_BRW_WRITE) ?
+ &loi->loi_write_lop : &loi->loi_read_lop;
+ lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
+ loi_list_maint(oap->oap_cli, oap->oap_loi);
+
+ osic_complete_one(oap->oap_osic, &oap->oap_occ, 0);
+ oap->oap_osic = NULL;
+
+ }
+
+unlock:
+ spin_unlock(&oap->oap_cli->cl_loi_list_lock);
+}
+
+/* this must be called holding the list lock to give coverage to exit_cache,
+ * async_flag maintenance, and oap_request */
static void osc_complete_oap(struct client_obd *cli,
struct osc_async_page *oap, int rc)
{
ENTRY;
osc_exit_cache(cli, oap);
oap->oap_async_flags = 0;
+ oap->oap_interrupted = 0;
+
+ if (oap->oap_request != NULL) {
+ ptlrpc_req_finished(oap->oap_request);
+ oap->oap_request = NULL;
+ }
+
if (oap->oap_osic) {
- osic_complete_one(oap->oap_osic, rc);
+ osic_complete_one(oap->oap_osic, &oap->oap_occ, rc);
oap->oap_osic = NULL;
EXIT;
return;
oap = list_entry(pos, struct osc_async_page,
oap_rpc_item);
list_del_init(&oap->oap_rpc_item);
+
+ /* queued sync pages can be torn down while the pages
+ * were between the pending list and the rpc */
+ if (oap->oap_interrupted) {
+ CDEBUG(D_INODE, "oap %p interrupted\n", oap);
+ osc_complete_oap(cli, oap, oap->oap_count);
+ continue;
+ }
+
+ /* put the page back in the loi/lop lists */
list_add_tail(&oap->oap_pending_item,
&lop->lop_pending);
lop_update_pending(cli, lop, cmd, 1);
list_splice(&rpc_list, &aa->aa_oaps);
INIT_LIST_HEAD(&rpc_list);
- if (cmd == OBD_BRW_READ)
+ if (cmd == OBD_BRW_READ) {
lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
- else
- lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
-
- spin_lock(&cli->cl_loi_list_lock);
- if (cmd == OBD_BRW_READ)
lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_brw_in_flight);
- else
+ } else {
+ lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
lprocfs_oh_tally(&cli->cl_write_rpc_hist,
cli->cl_brw_in_flight);
+ }
+
+ spin_lock(&cli->cl_loi_list_lock);
cli->cl_brw_in_flight++;
+ /* queued sync pages can be torn down while the pages
+ * were between the pending list and the rpc */
+ list_for_each(pos, &aa->aa_oaps) {
+ oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
+ if (oap->oap_interrupted) {
+ CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
+ oap, request);
+ ptlrpc_mark_interrupted(request);
+ break;
+ }
+ }
+
CDEBUG(D_INODE, "req %p: %d pages, aa %p. now %d in flight\n", request,
page_count, aa, cli->cl_brw_in_flight);
+ oap->oap_request = ptlrpc_request_addref(request);
request->rq_interpret_reply = brw_interpret_oap;
ptlrpcd_add_req(request);
RETURN(1);
return -ENOMEM;
oap->oap_magic = OAP_MAGIC;
+ oap->oap_cli = &exp->exp_obd->u.cli;
+ oap->oap_loi = loi;
+
oap->oap_caller_ops = ops;
oap->oap_caller_data = data;
INIT_LIST_HEAD(&oap->oap_urgent_item);
INIT_LIST_HEAD(&oap->oap_rpc_item);
+ oap->oap_occ.occ_interrupted = osc_occ_interrupted;
+
CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
*res = oap;
RETURN(0);
list_add_tail(&oap->oap_pending_item, &lop->lop_pending_sync);
oap->oap_osic = osic;
- osic_add_one(osic);
+ osic_add_one(osic, &oap->oap_occ);
LOI_DEBUG(loi, "oap %p page %p on sync pending\n", oap, oap->oap_page);
spin_lock(&cli->cl_loi_list_lock);
- osc_exit_cache(cli, oap);
-
if (!list_empty(&oap->oap_rpc_item))
GOTO(out, rc = -EBUSY);
+ osc_exit_cache(cli, oap);
+
if (!list_empty(&oap->oap_urgent_item)) {
list_del_init(&oap->oap_urgent_item);
oap->oap_async_flags &= ~ASYNC_URGENT;
LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
out:
spin_unlock(&cli->cl_loi_list_lock);
- OBD_FREE(oap, sizeof(*oap));
+ if (rc == 0)
+ OBD_FREE(oap, sizeof(*oap));
RETURN(rc);
}
GOTO(interpret, req->rq_status);
}
- if (req->rq_intr) {
+ /* ptlrpc_queue_wait->l_wait_event guarantees that rq_intr
+ * will only be set after rq_timedout, but the osic waiting
+ * path sets rq_intr irrespective of whether ptlrpcd has
+ * seen a timeout. our policy is to only interpret
+ * interrupted rpcs after they have timed out */
+ if (req->rq_intr && (req->rq_timedout || req->rq_waiting)) {
/* NB could be on delayed list */
ptlrpc_unregister_reply(req);
req->rq_status = -EINTR;
RETURN(1);
}
+void ptlrpc_mark_interrupted(struct ptlrpc_request *req)
+{
+ unsigned long flags;
+ spin_lock_irqsave(&req->rq_lock, flags);
+ req->rq_intr = 1;
+ spin_unlock_irqrestore(&req->rq_lock, flags);
+}
+
void ptlrpc_interrupted_set(void *data)
{
struct ptlrpc_request_set *set = data;
struct list_head *tmp;
- unsigned long flags;
LASSERT(set != NULL);
CERROR("INTERRUPTED SET %p\n", set);
if (req->rq_phase != RQ_PHASE_RPC)
continue;
- spin_lock_irqsave (&req->rq_lock, flags);
- req->rq_intr = 1;
- spin_unlock_irqrestore (&req->rq_lock, flags);
+ ptlrpc_mark_interrupted(req);
}
}
EXPORT_SYMBOL(ptlrpc_set_wait);
EXPORT_SYMBOL(ptlrpc_expired_set);
EXPORT_SYMBOL(ptlrpc_interrupted_set);
+EXPORT_SYMBOL(ptlrpc_mark_interrupted);
/* service.c */
EXPORT_SYMBOL(ptlrpc_init_svc);
EXPORT_SYMBOL(ptlrpcd_addref);
EXPORT_SYMBOL(ptlrpcd_decref);
EXPORT_SYMBOL(ptlrpcd_add_req);
+EXPORT_SYMBOL(ptlrpcd_wake);
/* lproc_ptlrpc.c */
EXPORT_SYMBOL(ptlrpc_lprocfs_register_obd);
static DECLARE_MUTEX(ptlrpcd_sem);
static int ptlrpcd_users = 0;
+void ptlrpcd_wake(void)
+{
+ struct ptlrpcd_ctl *pc = &ptlrpcd_pc;
+ wake_up(&pc->pc_waitq);
+}
+
void ptlrpcd_add_req(struct ptlrpc_request *req)
{
struct ptlrpcd_ctl *pc = &ptlrpcd_pc;
ptlrpc_set_add_new_req(pc->pc_set, req);
- wake_up(&pc->pc_waitq);
+ ptlrpcd_wake();
}
static int ptlrpcd_check(struct ptlrpcd_ctl *pc)