From 8546bd6cc475a20b4f529aae280789f50ab9e3fb Mon Sep 17 00:00:00 2001 From: green Date: Wed, 7 Dec 2005 14:38:59 +0000 Subject: [PATCH] b=9297 r=adilger Stop sending data to evicted clients as soon as possible. --- lustre/include/linux/lustre_lib.h | 39 ++++++++++++++++++++++++++++++++------- lustre/ost/ost_handler.c | 37 ++++++++++++++++++++++++++++--------- lustre/ptlrpc/niobuf.c | 4 ++++ lustre/ptlrpc/pers.c | 22 ++++++++++++++++++++++ lustre/ptlrpc/ptlrpc_internal.h | 1 + 5 files changed, 87 insertions(+), 16 deletions(-) diff --git a/lustre/include/linux/lustre_lib.h b/lustre/include/linux/lustre_lib.h index 8dfbb60..74b99091 100644 --- a/lustre/include/linux/lustre_lib.h +++ b/lustre/include/linux/lustre_lib.h @@ -514,12 +514,21 @@ static inline int ll_insecure_random_int(void) * SIGNALS. The caller must therefore beware that if 'timeout' is zero, or if * 'timeout_handler' is not NULL and returns FALSE, then the ONLY thing that * can unblock the current process is 'condition' becoming TRUE. + * + * Another form of usage is: + * struct l_wait_info lwi = LWI_TIMEOUT_INTERVAL(timeout, interval, + * timeout_handler); + * rc = l_wait_event(waitq, condition, &lwi); + * This is the same as previous case, but condition is checked once every + * 'interval' jiffies (if non-zero). + * */ #define LWI_ON_SIGNAL_NOOP ((void (*)(void *))(-1)) struct l_wait_info { long lwi_timeout; + long lwi_interval; int (*lwi_on_timeout)(void *); void (*lwi_on_signal)(void *); void *lwi_cb_data; @@ -530,15 +539,26 @@ struct l_wait_info { ((struct l_wait_info) { \ .lwi_timeout = time, \ .lwi_on_timeout = cb, \ - .lwi_cb_data = data \ + .lwi_cb_data = data, \ + .lwi_interval = 0 \ }) +#define LWI_TIMEOUT_INTERVAL(time, interval, cb, data) \ +((struct l_wait_info) { \ + .lwi_timeout = time, \ + .lwi_on_timeout = cb, \ + .lwi_cb_data = data, \ + .lwi_interval = interval \ +}) + + #define LWI_TIMEOUT_INTR(time, time_cb, sig_cb, data) \ ((struct l_wait_info) { \ .lwi_timeout = time, \ .lwi_on_timeout = time_cb, \ .lwi_on_signal = (sig_cb == NULL) ? LWI_ON_SIGNAL_NOOP : sig_cb, \ - .lwi_cb_data = data \ + .lwi_cb_data = data, \ + .lwi_interval = 0 \ }) #define LWI_INTR(cb, data) LWI_TIMEOUT_INTR(0, NULL, cb, data) @@ -565,7 +585,7 @@ static inline sigset_t l_w_e_set_sigs(int sigs) #define __l_wait_event(wq, condition, info, ret, excl) \ do { \ wait_queue_t __wait; \ - signed long __timeout = info->lwi_timeout; \ + unsigned long __timeout = info->lwi_timeout; \ unsigned long __irqflags; \ sigset_t __blocked; \ \ @@ -591,11 +611,15 @@ do { \ if (condition) \ break; \ \ - if (__timeout == 0) { \ + if (__timeout) { \ schedule(); \ } else { \ - __timeout = schedule_timeout(__timeout); \ - if (__timeout == 0) { \ + unsigned long interval = info->lwi_interval? \ + min_t(unsigned long, \ + info->lwi_interval,__timeout):\ + __timeout; \ + __timeout -= interval + schedule_timeout(interval); \ + if (__timeout) { \ if (info->lwi_on_timeout == NULL || \ info->lwi_on_timeout(info->lwi_cb_data)) { \ ret = -ETIMEDOUT; \ @@ -657,7 +681,8 @@ do { \ __then = time(NULL); \ \ while (!(condition)) { \ - if (liblustre_wait_event(__timeout)) { \ + if (liblustre_wait_event(info->lwi_interval?:__timeout) || \ + (info->lwi_interval && info->lwi_interval < __timeout)) {\ if (__timeout != 0 && info->lwi_timeout != 0) { \ __now = time(NULL); \ __timeout -= __now - __then; \ diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index e621936..e5f7cdb 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -697,17 +697,27 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) } } + /* Check if client was evicted while we were doing i/o before touching + network */ if (rc == 0) { - rc = ptlrpc_start_bulk_transfer(desc); + if (desc->bd_export->exp_failed) + rc = -ENOTCONN; + else + rc = ptlrpc_start_bulk_transfer(desc); if (rc == 0) { - lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, - ost_bulk_timeout, desc); + lwi = LWI_TIMEOUT_INTERVAL(obd_timeout * HZ / 4, HZ, + ost_bulk_timeout, desc); rc = l_wait_event(desc->bd_waitq, - !ptlrpc_bulk_active(desc), &lwi); + !ptlrpc_bulk_active(desc) || + desc->bd_export->exp_failed, &lwi); LASSERT(rc == 0 || rc == -ETIMEDOUT); if (rc == -ETIMEDOUT) { DEBUG_REQ(D_ERROR, req, "timeout on bulk PUT"); ptlrpc_abort_bulk(desc); + } else if (desc->bd_export->exp_failed) { + DEBUG_REQ(D_ERROR, req, "Eviction on bulk PUT"); + rc = -ENOTCONN; + ptlrpc_abort_bulk(desc); } else if (!desc->bd_success || desc->bd_nob_transferred != desc->bd_nob) { DEBUG_REQ(D_ERROR, req, "%s bulk PUT %d(%d)", @@ -900,16 +910,25 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) pp_rnb[i].offset & (PAGE_SIZE - 1), pp_rnb[i].len); - rc = ptlrpc_start_bulk_transfer (desc); + /* Check if client was evicted while we were doing i/o before touching + network */ + if (desc->bd_export->exp_failed) + rc = -ENOTCONN; + else + rc = ptlrpc_start_bulk_transfer (desc); if (rc == 0) { - lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, - ost_bulk_timeout, desc); - rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), - &lwi); + lwi = LWI_TIMEOUT_INTERVAL(obd_timeout * HZ / 4, HZ, + ost_bulk_timeout, desc); + rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc) || + desc->bd_export->exp_failed, &lwi); LASSERT(rc == 0 || rc == -ETIMEDOUT); if (rc == -ETIMEDOUT) { DEBUG_REQ(D_ERROR, req, "timeout on bulk GET"); ptlrpc_abort_bulk(desc); + } else if (desc->bd_export->exp_failed) { + DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET"); + rc = -ENOTCONN; + ptlrpc_abort_bulk(desc); } else if (!desc->bd_success || desc->bd_nob_transferred != desc->bd_nob) { DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)", diff --git a/lustre/ptlrpc/niobuf.c b/lustre/ptlrpc/niobuf.c index e1b3219..c63c971 100644 --- a/lustre/ptlrpc/niobuf.c +++ b/lustre/ptlrpc/niobuf.c @@ -163,6 +163,10 @@ void ptlrpc_abort_bulk (struct ptlrpc_bulk_desc *desc) if (!ptlrpc_bulk_active(desc)) /* completed or */ return; /* never started */ + /* Do not send any meaningful data over the wire for evicted clients */ + if (desc->bd_export && desc->bd_export->exp_failed) + ptl_rpc_wipe_bulk_pages(desc); + /* The unlink ensures the callback happens ASAP and is the last * one. If it fails, it must be because completion just happened, * but we must still l_wait_event() in this case, to give liblustre diff --git a/lustre/ptlrpc/pers.c b/lustre/ptlrpc/pers.c index b09fb8c..5dfbe85 100644 --- a/lustre/ptlrpc/pers.c +++ b/lustre/ptlrpc/pers.c @@ -61,6 +61,18 @@ void ptlrpc_add_bulk_page(struct ptlrpc_bulk_desc *desc, struct page *page, desc->bd_iov_count++; } +void ptl_rpc_wipe_bulk_pages(struct ptlrpc_bulk_desc *desc) +{ + int i; + + for (i = 0; i < desc->bd_iov_count ; i++) { + lnet_kiov_t *kiov = &desc->bd_iov[i]; + memset(kmap(kiov->kiov_page)+kiov->kiov_offset, 0xab, + kiov->kiov_len); + kunmap(kiov->kiov_page); + } +} + #else /* !__KERNEL__ */ void ptlrpc_fill_bulk_md(lnet_md_t *md, struct ptlrpc_bulk_desc *desc) @@ -105,4 +117,14 @@ void ptlrpc_add_bulk_page(struct ptlrpc_bulk_desc *desc, struct page *page, } } +void ptl_rpc_wipe_bulk_pages(struct ptlrpc_bulk_desc *desc) +{ + int i; + + for(i = 0; i < desc->bd_iov_count; i++) { + lnet_md_iovec_t *iov = &desc->bd_iov[i]; + + memset(iov->iov_base, 0xab, iov->iov_len); + } +} #endif /* !__KERNEL__ */ diff --git a/lustre/ptlrpc/ptlrpc_internal.h b/lustre/ptlrpc/ptlrpc_internal.h index 0ce3872..be8c52c 100644 --- a/lustre/ptlrpc/ptlrpc_internal.h +++ b/lustre/ptlrpc/ptlrpc_internal.h @@ -106,6 +106,7 @@ int ptlrpc_expire_one_request(struct ptlrpc_request *req); void ptlrpc_fill_bulk_md(lnet_md_t *md, struct ptlrpc_bulk_desc *desc); void ptlrpc_add_bulk_page(struct ptlrpc_bulk_desc *desc, struct page *page, int pageoffset, int len); +void ptl_rpc_wipe_bulk_pages(struct ptlrpc_bulk_desc *desc); /* pinger.c */ int ptlrpc_start_pinger(void); -- 1.8.3.1