From ae36c7d8afedc46e22216ffeb0ef282729308983 Mon Sep 17 00:00:00 2001 From: vitaly Date: Tue, 18 Nov 2008 21:40:12 +0000 Subject: [PATCH] Branch b1_8_gate b=16129 i=adilger i=green - a high priority request list is added into service; - once a lock is canceled, all the IO requests, including coming ones, under this lock, are moved into this list; - PING is also added into this list; - once a lock cancel timeout occurs, the timeout is prolonged if there is an IO rpc under this lock; - another request list is added into the export, used to speed up the rpc-lock matching. --- lustre/ost/ost_handler.c | 410 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 306 insertions(+), 104 deletions(-) diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index e874ad8..b50f84c 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -282,10 +282,9 @@ static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req, /* check that we do support OBD_CONNECT_TRUNCLOCK. */ CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK); - body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body), - lustre_swab_ost_body); - if (body == NULL) - RETURN(-EFAULT); + /* ost_body is varified and swabbed in ost_hpreq_handler() */ + body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); + LASSERT(body != NULL); oinfo.oi_oa = &body->oa; oinfo.oi_policy.l_extent.start = oinfo.oi_oa->o_size; @@ -464,7 +463,9 @@ static void ost_brw_lock_put(int mode, struct ost_prolong_data { struct obd_export *opd_exp; ldlm_policy_data_t opd_policy; + struct obdo *opd_oa; ldlm_mode_t opd_mode; + int opd_lock_match; }; static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data) @@ -494,6 +495,14 @@ static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data) return LDLM_ITER_CONTINUE; } + /* Fill the obdo with the matched lock handle. + * XXX: it is possible in some cases the IO RPC is covered by several + * locks, even for the write case, so it may need to be a lock list. */ + if (opd->opd_oa && !(opd->opd_oa->o_valid & OBD_MD_FLHANDLE)) { + opd->opd_oa->o_handle.cookie = lock->l_handle.h_cookie; + opd->opd_oa->o_valid |= OBD_MD_FLHANDLE; + } + if (!(lock->l_flags & LDLM_FL_AST_SENT)) { /* ignore locks not being cancelled */ return LDLM_ITER_CONTINUE; @@ -502,19 +511,20 @@ static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data) /* OK. this is a possible lock the user holds doing I/O * let's refresh eviction timer for it */ ldlm_refresh_waiting_lock(lock); + opd->opd_lock_match = 1; return LDLM_ITER_CONTINUE; } -static void ost_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj, - struct niobuf_remote *nb, struct obdo *oa, - ldlm_mode_t mode) +static int ost_rw_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj, + struct niobuf_remote *nb, struct obdo *oa, + ldlm_mode_t mode) { struct ldlm_res_id res_id = { .name = { obj->ioo_id } }; + struct ost_prolong_data opd = { 0 }; int nrbufs = obj->ioo_bufcnt; - struct ost_prolong_data opd; ENTRY; @@ -534,16 +544,28 @@ static void ost_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj, lock = ldlm_handle2lock(&oa->o_handle); if (lock != NULL) { ost_prolong_locks_iter(lock, &opd); + if (opd.opd_lock_match) { + LDLM_LOCK_PUT(lock); + RETURN(1); + } + + /* Check if the lock covers the whole IO region, + * otherwise iterate through the resource. */ + if (lock->l_policy_data.l_extent.end >= + opd.opd_policy.l_extent.end && + lock->l_policy_data.l_extent.start <= + opd.opd_policy.l_extent.start) { + LDLM_LOCK_PUT(lock); + RETURN(0); + } LDLM_LOCK_PUT(lock); - EXIT; - return; } } + opd.opd_oa = oa; ldlm_resource_iterate(exp->exp_obd->obd_namespace, &res_id, ost_prolong_locks_iter, &opd); - - EXIT; + RETURN(opd.opd_lock_match); } static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) @@ -557,7 +579,7 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) struct l_wait_info lwi; struct lustre_handle lockh = { 0 }; __u32 size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; - int objcount, niocount, npages, nob = 0, rc, i; + int niocount, npages, nob = 0, rc, i; int no_reply = 0; ENTRY; @@ -577,50 +599,18 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) if (exp->exp_failed) GOTO(out, rc = -ENOTCONN); - body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body), - lustre_swab_ost_body); - if (body == NULL) { - CERROR("Missing/short ost_body\n"); - GOTO(out, rc = -EFAULT); - } + /* ost_body, ioobj & noibuf_remote are verified and swabbed in + * ost_rw_hpreq_check(). */ + body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); + LASSERT(body != NULL); - objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) / - sizeof(*ioo); - if (objcount == 0) { - CERROR("Missing/short ioobj\n"); - GOTO(out, rc = -EFAULT); - } - if (objcount > 1) { - CERROR("too many ioobjs (%d)\n", objcount); - GOTO(out, rc = -EFAULT); - } - - ioo = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, sizeof(*ioo), - lustre_swab_obd_ioobj); - if (ioo == NULL) { - CERROR("Missing/short ioobj\n"); - GOTO(out, rc = -EFAULT); - } + ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*ioo)); + LASSERT(ioo != NULL); niocount = ioo->ioo_bufcnt; - if (niocount > PTLRPC_MAX_BRW_PAGES) { - DEBUG_REQ(D_ERROR, req, "bulk has too many pages (%d)", - niocount); - GOTO(out, rc = -EFAULT); - } - - remote_nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2, - niocount * sizeof(*remote_nb), - lustre_swab_niobuf_remote); - if (remote_nb == NULL) { - CERROR("Missing/short niobuf\n"); - GOTO(out, rc = -EFAULT); - } - if (lustre_req_need_swab(req)) { - /* swab remaining niobufs */ - for (i = 1; i < niocount; i++) - lustre_swab_niobuf_remote (&remote_nb[i]); - } + remote_nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, + niocount * sizeof(*remote_nb)); + LASSERT(remote_nb != NULL); rc = lustre_pack_reply(req, 2, size, NULL); if (rc) @@ -662,7 +652,7 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti) if (desc == NULL) /* XXX: check all cleanup stuff */ GOTO(out, rc = -ENOMEM); - ost_prolong_locks(exp, ioo, remote_nb, &body->oa, LCK_PW | LCK_PR); + ost_rw_prolong_locks(exp, ioo, remote_nb, &body->oa, LCK_PW | LCK_PR); nob = 0; for (i = 0; i < npages; i++) { @@ -821,7 +811,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) __u32 *rcs; __u32 size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) }; int objcount, niocount, npages; - int rc, swab, i, j; + int rc, i, j; obd_count client_cksum = 0, server_cksum = 0; cksum_type_t cksum_type = OBD_CKSUM_CRC32; int no_reply = 0; @@ -846,56 +836,22 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) if (exp->exp_failed) GOTO(out, rc = -ENOTCONN); - swab = lustre_req_need_swab(req); - body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body), - lustre_swab_ost_body); - if (body == NULL) { - CERROR("Missing/short ost_body\n"); - GOTO(out, rc = -EFAULT); - } + /* ost_body, ioobj & noibuf_remote are verified and swabbed in + * ost_rw_hpreq_check(). */ + body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); + LASSERT(body != NULL); objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) / sizeof(*ioo); - if (objcount == 0) { - CERROR("Missing/short ioobj\n"); - GOTO(out, rc = -EFAULT); - } - if (objcount > 1) { - CERROR("too many ioobjs (%d)\n", objcount); - GOTO(out, rc = -EFAULT); - } - - lustre_set_req_swabbed(req, REQ_REC_OFF + 1); ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, objcount * sizeof(*ioo)); - LASSERT (ioo != NULL); - for (niocount = i = 0; i < objcount; i++) { - if (swab) - lustre_swab_obd_ioobj(&ioo[i]); - if (ioo[i].ioo_bufcnt == 0) { - CERROR("ioo[%d] has zero bufcnt\n", i); - GOTO(out, rc = -EFAULT); - } + LASSERT(ioo != NULL); + for (niocount = i = 0; i < objcount; i++) niocount += ioo[i].ioo_bufcnt; - } - if (niocount > PTLRPC_MAX_BRW_PAGES) { - DEBUG_REQ(D_ERROR, req, "bulk has too many pages (%d)", - niocount); - GOTO(out, rc = -EFAULT); - } - - remote_nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2, - niocount * sizeof(*remote_nb), - lustre_swab_niobuf_remote); - if (remote_nb == NULL) { - CERROR("Missing/short niobuf\n"); - GOTO(out, rc = -EFAULT); - } - if (swab) { /* swab the remaining niobufs */ - for (i = 1; i < niocount; i++) - lustre_swab_niobuf_remote (&remote_nb[i]); - } + remote_nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, + niocount * sizeof(*remote_nb)); + LASSERT(remote_nb != NULL); size[REPLY_REC_OFF + 1] = niocount * sizeof(*rcs); rc = lustre_pack_reply(req, 3, size, NULL); @@ -931,7 +887,7 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti) GOTO(out_lock, rc = -ETIMEDOUT); } - ost_prolong_locks(exp, ioo, remote_nb,&body->oa, LCK_PW); + ost_rw_prolong_locks(exp, ioo, remote_nb,&body->oa, LCK_PW); /* obd_preprw clobbers oa->valid, so save what we need */ if (body->oa.o_valid & OBD_MD_FLCKSUM) { @@ -1374,6 +1330,250 @@ int ost_msg_check_version(struct lustre_msg *msg) return rc; } +static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req, + struct ldlm_lock *lock) +{ + struct niobuf_remote *nb; + struct obd_ioobj *ioo; + struct ost_body *body; + int objcount, niocount; + int mode, opc, i; + __u64 start, end; + ENTRY; + + opc = lustre_msg_get_opc(req->rq_reqmsg); + LASSERT(opc == OST_READ || opc == OST_WRITE); + + /* As the request may be covered by several locks, do not look at + * o_handle, look at the RPC IO region. */ + body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body), + lustre_swab_obdo); + objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) / + sizeof(*ioo); + ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, + objcount * sizeof(*ioo)); + LASSERT(ioo != NULL); + for (niocount = i = 0; i < objcount; i++) + niocount += ioo[i].ioo_bufcnt; + + nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, + niocount * sizeof(*nb)); + LASSERT(nb != NULL); + + mode = LCK_PW; + if (opc == OST_READ) + mode |= LCK_PR; + + start = nb[0].offset & CFS_PAGE_MASK; + end = (nb[ioo->ioo_bufcnt - 1].offset + + nb[ioo->ioo_bufcnt - 1].len - 1) | ~CFS_PAGE_MASK; + + if (!(lock->l_granted_mode & mode)) + RETURN(0); + + if (lock->l_policy_data.l_extent.end < start || + lock->l_policy_data.l_extent.start > end) + RETURN(0); + + RETURN(1); +} + +/** + * Swab buffers needed to call ost_rw_prolong_locks() and call it. + * Return the value from ost_rw_prolong_locks() which is non-zero if + * there is a cancelled lock which is waiting for this IO request. + */ +static int ost_rw_hpreq_check(struct ptlrpc_request *req) +{ + struct niobuf_remote *nb; + struct obd_ioobj *ioo; + struct ost_body *body; + int objcount, niocount; + int mode, opc, i; + ENTRY; + + opc = lustre_msg_get_opc(req->rq_reqmsg); + LASSERT(opc == OST_READ || opc == OST_WRITE); + + body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); + LASSERT(body != NULL); + + objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) / + sizeof(*ioo); + ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, + objcount * sizeof(*ioo)); + LASSERT(ioo != NULL); + + for (niocount = i = 0; i < objcount; i++) + niocount += ioo[i].ioo_bufcnt; + nb = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2, + niocount * sizeof(*nb)); + LASSERT(nb != NULL); + LASSERT(niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK)); + + mode = LCK_PW; + if (opc == OST_READ) + mode |= LCK_PR; + RETURN(ost_rw_prolong_locks(req->rq_export, ioo, nb, &body->oa, mode)); +} + +static int ost_punch_prolong_locks(struct obd_export *exp, struct obdo *oa) +{ + struct ldlm_res_id res_id = { .name = { oa->o_id } }; + struct ost_prolong_data opd = { 0 }; + __u64 start, end; + ENTRY; + + start = oa->o_size; + end = start + oa->o_blocks; + + opd.opd_mode = LCK_PW; + opd.opd_exp = exp; + opd.opd_policy.l_extent.start = start & CFS_PAGE_MASK; + if (oa->o_blocks == OBD_OBJECT_EOF || end < start) + opd.opd_policy.l_extent.end = OBD_OBJECT_EOF; + else + opd.opd_policy.l_extent.end = end | ~CFS_PAGE_MASK; + + CDEBUG(D_DLMTRACE,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n", + res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start, + opd.opd_policy.l_extent.end); + + opd.opd_oa = oa; + ldlm_resource_iterate(exp->exp_obd->obd_namespace, &res_id, + ost_prolong_locks_iter, &opd); + RETURN(opd.opd_lock_match); +} + +static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req, + struct ldlm_lock *lock) +{ + struct ost_body *body; + ENTRY; + + body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body), + lustre_swab_obdo); + LASSERT(body != NULL); + + if (body->oa.o_valid & OBD_MD_FLHANDLE && + body->oa.o_handle.cookie == lock->l_handle.h_cookie) + RETURN(1); + RETURN(0); +} + +static int ost_punch_hpreq_check(struct ptlrpc_request *req) +{ + struct ost_body *body = lustre_msg_buf(req->rq_reqmsg, + REQ_REC_OFF, sizeof(*body)); + LASSERT(body != NULL); + LASSERT(!(body->oa.o_valid & OBD_MD_FLFLAGS) || + !(body->oa.o_flags & OBD_FL_TRUNCLOCK)); + + RETURN(ost_punch_prolong_locks(req->rq_export, &body->oa)); +} + +struct ptlrpc_hpreq_ops ost_hpreq_rw = { + .hpreq_lock_match = ost_rw_hpreq_lock_match, + .hpreq_check = ost_rw_hpreq_check, +}; + +struct ptlrpc_hpreq_ops ost_hpreq_punch = { + .hpreq_lock_match = ost_punch_hpreq_lock_match, + .hpreq_check = ost_punch_hpreq_check, +}; + +/** Assign high priority operations to the request if needed. */ +static int ost_hpreq_handler(struct ptlrpc_request *req) +{ + ENTRY; + if (req->rq_export) { + int opc = lustre_msg_get_opc(req->rq_reqmsg); + struct ost_body *body; + + if (opc == OST_READ || opc == OST_WRITE) { + struct niobuf_remote *nb; + struct obd_ioobj *ioo; + int objcount, niocount; + int swab, i; + + body = lustre_swab_reqbuf(req, REQ_REC_OFF, + sizeof(*body), + lustre_swab_obdo); + if (!body) { + CERROR("Missing/short ost_body\n"); + RETURN(-EFAULT); + } + objcount = lustre_msg_buflen(req->rq_reqmsg, + REQ_REC_OFF + 1) / + sizeof(*ioo); + if (objcount == 0) { + CERROR("Missing/short ioobj\n"); + RETURN(-EFAULT); + } + if (objcount > 1) { + CERROR("too many ioobjs (%d)\n", objcount); + RETURN(-EFAULT); + } + + swab = !lustre_req_swabbed(req, REQ_REC_OFF + 1) && + lustre_req_need_swab(req); + ioo = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, + objcount * sizeof(*ioo), + lustre_swab_obd_ioobj); + if (!ioo) { + CERROR("Missing/short ioobj\n"); + RETURN(-EFAULT); + } + for (niocount = i = 0; i < objcount; i++) { + if (i > 0 && swab) + lustre_swab_obd_ioobj(&ioo[i]); + if (ioo[i].ioo_bufcnt == 0) { + CERROR("ioo[%d] has zero bufcnt\n", i); + RETURN(-EFAULT); + } + niocount += ioo[i].ioo_bufcnt; + } + if (niocount > PTLRPC_MAX_BRW_PAGES) { + DEBUG_REQ(D_ERROR, req, "bulk has too many " + "pages (%d)", niocount); + RETURN(-EFAULT); + } + + swab = !lustre_req_swabbed(req, REQ_REC_OFF + 2) && + lustre_req_need_swab(req); + nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2, + niocount * sizeof(*nb), + lustre_swab_niobuf_remote); + if (!nb) { + CERROR("Missing/short niobuf\n"); + RETURN(-EFAULT); + } + + if (swab) { + /* swab remaining niobufs */ + for (i = 1; i < niocount; i++) + lustre_swab_niobuf_remote(&nb[i]); + } + + if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK)) + req->rq_ops = &ost_hpreq_rw; + } else if (opc == OST_PUNCH) { + body = lustre_swab_reqbuf(req, REQ_REC_OFF, + sizeof(*body), + lustre_swab_obdo); + if (!body) { + CERROR("Missing/short ost_body\n"); + RETURN(-EFAULT); + } + + if (!(body->oa.o_valid & OBD_MD_FLFLAGS) || + !(body->oa.o_flags & OBD_FL_TRUNCLOCK)) + req->rq_ops = &ost_hpreq_punch; + } + } + RETURN(0); +} + static int ost_handle(struct ptlrpc_request *req) { struct obd_trans_info trans_info = { 0, }; @@ -1679,7 +1879,7 @@ static int ost_setup(struct obd_device *obd, obd_count len, void *buf) /* Insure a 4x range for dynamic threads */ if (oss_min_threads > OSS_THREADS_MAX / 4) oss_min_threads = OSS_THREADS_MAX / 4; - oss_max_threads = min(OSS_THREADS_MAX, oss_min_threads * 4); + oss_max_threads = min(OSS_THREADS_MAX, oss_min_threads * 4 + 1); } ost->ost_service = @@ -1688,7 +1888,8 @@ static int ost_setup(struct obd_device *obd, obd_count len, void *buf) OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR, ost_handle, LUSTRE_OSS_NAME, obd->obd_proc_entry, target_print_req, - oss_min_threads, oss_max_threads, "ll_ost"); + oss_min_threads, oss_max_threads, "ll_ost", + NULL); if (ost->ost_service == NULL) { CERROR("failed to start OST service\n"); GOTO(out_lprocfs, rc = -ENOMEM); @@ -1718,7 +1919,7 @@ static int ost_setup(struct obd_device *obd, obd_count len, void *buf) obd->obd_proc_entry, target_print_req, oss_min_create_threads, oss_max_create_threads, - "ll_ost_creat"); + "ll_ost_creat", NULL); if (ost->ost_create_service == NULL) { CERROR("failed to start OST create service\n"); GOTO(out_service, rc = -ENOMEM); @@ -1734,7 +1935,8 @@ static int ost_setup(struct obd_device *obd, obd_count len, void *buf) OSC_REPLY_PORTAL, OSS_SERVICE_WATCHDOG_FACTOR, ost_handle, "ost_io", obd->obd_proc_entry, target_print_req, - oss_min_threads, oss_max_threads, "ll_ost_io"); + oss_min_threads, oss_max_threads, "ll_ost_io", + ost_hpreq_handler); if (ost->ost_io_service == NULL) { CERROR("failed to start OST I/O service\n"); GOTO(out_create, rc = -ENOMEM); -- 1.8.3.1