X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fosc%2Fosc_request.c;h=2c4cef7d9f1b1a49e16f9c61ad17556eac7f0895;hb=f0d608786a27dfb8dddf06d6b086b491749557f1;hp=be0027bfc0bfecc514ca34067311e5ac2282281f;hpb=97b9148af8f7b8d31892f2f114e9461f85e9a4a4;p=fs%2Flustre-release.git diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index be0027b..2c4cef7 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -66,12 +66,11 @@ #include #include "osc_internal.h" -static quota_interface_t *quota_interface = NULL; -extern quota_interface_t osc_quota_interface; - static void osc_release_ppga(struct brw_page **ppga, obd_count count); static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *data, int rc); +static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli, + int ptlrpc); int osc_cleanup(struct obd_device *obd); /* Pack OSC object metadata for disk storage (LE byte order). */ @@ -403,7 +402,7 @@ int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo, /* do mds to ost setattr asynchronously */ if (!rqset) { /* Do not wait for response. */ - ptlrpcd_add_req(req, PSCOPE_OTHER); + ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); } else { req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret; @@ -415,7 +414,7 @@ int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo, sa->sa_cookie = cookie; if (rqset == PTLRPCD_SET) - ptlrpcd_add_req(req, PSCOPE_OTHER); + ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); else ptlrpc_set_add_req(rqset, req); } @@ -554,7 +553,7 @@ int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo, sa->sa_upcall = upcall; sa->sa_cookie = cookie; if (rqset == PTLRPCD_SET) - ptlrpcd_add_req(req, PSCOPE_OTHER); + ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); else ptlrpc_set_add_req(rqset, req); @@ -768,7 +767,7 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa, } /* Do not wait for response */ - ptlrpcd_add_req(req, PSCOPE_OTHER); + ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); RETURN(0); } @@ -1219,14 +1218,16 @@ static int check_write_rcs(struct ptlrpc_request *req, static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2) { if (p1->flag != p2->flag) { - unsigned mask = ~(OBD_BRW_FROM_GRANT| - OBD_BRW_NOCACHE|OBD_BRW_SYNC|OBD_BRW_ASYNC); + unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE| + OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA); /* warn if we try to combine flags that we don't know to be * safe to combine */ - if ((p1->flag & mask) != (p2->flag & mask)) - CERROR("is it ok to have flags 0x%x and 0x%x in the " - "same brw?\n", p1->flag, p2->flag); + if (unlikely((p1->flag & mask) != (p2->flag & mask))) { + CWARN("Saw flags 0x%x and 0x%x in the same brw, please " + "report this at http://bugs.whamcloud.com/\n", + p1->flag, p2->flag); + } return 0; } @@ -1266,7 +1267,7 @@ static obd_count osc_checksum_bulk(int nob, obd_count pg_count, if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND)) cksum++; - return cksum; + return fini_checksum(cksum, cksum_type); } static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, @@ -1484,15 +1485,8 @@ static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer, return 0; } - /* If this is mmaped file - it can be changed at any time */ - if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & OBD_FL_MMAP) - return 1; - - if (oa->o_valid & OBD_MD_FLFLAGS) - cksum_type = cksum_type_unpack(oa->o_flags); - else - cksum_type = OBD_CKSUM_CRC32; - + cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ? + oa->o_flags : 0); new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE, cksum_type); @@ -1547,7 +1541,6 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) RETURN(-EPROTO); } -#ifdef HAVE_QUOTA_SUPPORT /* set/clear over quota flag for a uid/gid */ if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE && body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) { @@ -1556,10 +1549,8 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n", body->oa.o_uid, body->oa.o_gid, body->oa.o_valid, body->oa.o_flags); - lquota_setdq(quota_interface, cli, qid, body->oa.o_valid, - body->oa.o_flags); + osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags); } -#endif osc_update_grant(cli, body); @@ -1620,10 +1611,8 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) char *router; cksum_type_t cksum_type; - if (body->oa.o_valid & OBD_MD_FLFLAGS) - cksum_type = cksum_type_unpack(body->oa.o_flags); - else - cksum_type = OBD_CKSUM_CRC32; + cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS? + body->oa.o_flags : 0); client_cksum = osc_checksum_bulk(rc, aa->aa_page_count, aa->aa_ppga, OST_READ, cksum_type); @@ -1978,7 +1967,6 @@ static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap, static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop, int cmd) { - int optimal; ENTRY; if (lop->lop_num_pending == 0) @@ -1999,8 +1987,7 @@ static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop, CDEBUG(D_CACHE, "urgent request forcing RPC\n"); RETURN(1); } - /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */ - optimal = cli->cl_max_pages_per_rpc; + if (cmd & OBD_BRW_WRITE) { /* trigger a write rpc stream as long as there are dirtiers * waiting for space. as they're waiting, they're not going to @@ -2009,13 +1996,8 @@ static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop, CDEBUG(D_CACHE, "cache waiters forcing RPC\n"); RETURN(1); } - /* +16 to avoid triggering rpcs that would want to include pages - * that are being queued but which can't be made ready until - * the queuer finishes with the page. this is a wart for - * llite::commit_write() */ - optimal += 16; } - if (lop->lop_num_pending >= optimal) + if (lop->lop_num_pending >= cli->cl_max_pages_per_rpc) RETURN(1); RETURN(0); @@ -2206,9 +2188,9 @@ static void osc_ap_completion(const struct lu_env *env, rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data, oap->oap_cmd, oa, rc); - /* ll_ap_completion (from llite) drops PG_locked. so, a new - * I/O on the page could start, but OSC calls it under lock - * and thus we can add oap back to pending safely */ + /* cl_page_completion() drops PG_locked. so, a new I/O on the page could + * start, but OSC calls it under lock and thus we can add oap back to + * pending safely */ if (rc) /* upper layer wants to leave the page on pending queue */ osc_oap_to_pending(oap); @@ -2217,6 +2199,18 @@ static void osc_ap_completion(const struct lu_env *env, EXIT; } +static int brw_queue_work(const struct lu_env *env, void *data) +{ + struct client_obd *cli = data; + + CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli); + + client_obd_list_lock(&cli->cl_loi_list_lock); + osc_check_rpcs0(env, cli, 1); + client_obd_list_unlock(&cli->cl_loi_list_lock); + RETURN(0); +} + static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *data, int rc) { @@ -2228,20 +2222,9 @@ static int brw_interpret(const struct lu_env *env, rc = osc_brw_fini_request(req, rc); CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc); if (osc_recoverable_error(rc)) { - /* Only retry once for mmaped files since the mmaped page - * might be modified at anytime. We have to retry at least - * once in case there WAS really a corruption of the page - * on the network, that was not caused by mmap() modifying - * the page. Bug11742 */ - if ((rc == -EAGAIN) && (aa->aa_resends > 0) && - aa->aa_oa->o_valid & OBD_MD_FLFLAGS && - aa->aa_oa->o_flags & OBD_FL_MMAP) { - rc = 0; - } else { - rc = osc_brw_redo_request(req, aa); - if (rc == 0) - RETURN(0); - } + rc = osc_brw_redo_request(req, aa); + if (rc == 0) + RETURN(0); } if (aa->aa_ocapa) { @@ -2250,7 +2233,6 @@ static int brw_interpret(const struct lu_env *env, } cli = aa->aa_cli; - client_obd_list_lock(&cli->cl_loi_list_lock); /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters @@ -2278,11 +2260,14 @@ static int brw_interpret(const struct lu_env *env, osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1); } osc_wake_cache_waiters(cli); - osc_check_rpcs(env, cli); + osc_check_rpcs0(env, cli, 1); client_obd_list_unlock(&cli->cl_loi_list_lock); + if (!async) - cl_req_completion(env, aa->aa_clerq, rc); + cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc : + req->rq_bulk->bd_nob_transferred); osc_release_ppga(aa->aa_ppga, aa->aa_page_count); + ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred); RETURN(rc); } @@ -2297,10 +2282,8 @@ static struct ptlrpc_request *osc_build_req(const struct lu_env *env, struct osc_brw_async_args *aa; struct obdo *oa = NULL; const struct obd_async_page_ops *ops = NULL; - void *caller_data = NULL; struct osc_async_page *oap; struct osc_async_page *tmp; - struct ost_body *body; struct cl_req *clerq = NULL; enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ; struct ldlm_lock *lock = NULL; @@ -2327,7 +2310,6 @@ static struct ptlrpc_request *osc_build_req(const struct lu_env *env, struct cl_page *page = osc_oap2cl_page(oap); if (ops == NULL) { ops = oap->oap_caller_ops; - caller_data = oap->oap_caller_data; clerq = cl_req_alloc(env, page, crt, 1 /* only 1-object rpcs for @@ -2376,7 +2358,6 @@ static struct ptlrpc_request *osc_build_req(const struct lu_env *env, * later setattr before earlier BRW (as determined by the request xid), * the OST will not use BRW timestamps. Sadly, there is no obvious * way to do this in a single call. bug 10150 */ - body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); cl_req_attr_set(env, clerq, &crattr, OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME); @@ -2430,8 +2411,8 @@ out: */ static int osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, - struct lov_oinfo *loi, - int cmd, struct loi_oap_pages *lop) + struct lov_oinfo *loi, int cmd, + struct loi_oap_pages *lop, pdl_policy_t pol) { struct ptlrpc_request *req; obd_count page_count = 0; @@ -2451,10 +2432,14 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, * with ASYNC_HP. We have to send out them as soon as possible. */ cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) { if (oap->oap_async_flags & ASYNC_HP) - cfs_list_move(&oap->oap_pending_item, &lop->lop_pending); + cfs_list_move(&oap->oap_pending_item, &rpc_list); + else if (!(oap->oap_brw_flags & OBD_BRW_SYNC)) + /* only do this for writeback pages. */ + cfs_list_move_tail(&oap->oap_pending_item, &rpc_list); if (++page_count >= cli->cl_max_pages_per_rpc) break; } + cfs_list_splice_init(&rpc_list, &lop->lop_pending); page_count = 0; /* first we find the pages we're allowed to work with */ @@ -2540,27 +2525,6 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, } if (oap == NULL) break; - /* - * Page submitted for IO has to be locked. Either by - * ->ap_make_ready() or by higher layers. - */ -#if defined(__KERNEL__) && defined(__linux__) - { - struct cl_page *page; - - page = osc_oap2cl_page(oap); - - if (page->cp_type == CPT_CACHEABLE && - !(PageLocked(oap->oap_page) && - (CheckWriteback(oap->oap_page, cmd)))) { - CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n", - oap->oap_page, - (long)oap->oap_page->flags, - oap->oap_async_flags); - LBUG(); - } - } -#endif /* take the page out of our book-keeping */ cfs_list_del_init(&oap->oap_pending_item); @@ -2646,7 +2610,6 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, lprocfs_oh_tally_log2(&cli->cl_write_offset_hist, (starting_offset >> CFS_PAGE_SHIFT) + 1); } - ptlrpc_lprocfs_brw(req, aa->aa_requested_nob); client_obd_list_lock(&cli->cl_loi_list_lock); @@ -2675,7 +2638,19 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight); req->rq_interpret_reply = brw_interpret; - ptlrpcd_add_req(req, PSCOPE_BRW); + + /* XXX: Maybe the caller can check the RPC bulk descriptor to see which + * CPU/NUMA node the majority of pages were allocated on, and try + * to assign the async RPC to the CPU core (PDL_POLICY_PREFERRED) + * to reduce cross-CPU memory traffic. + * + * But on the other hand, we expect that multiple ptlrpcd threads + * and the initial write sponsor can run in parallel, especially + * when data checksum is enabled, which is CPU-bound operation and + * single ptlrpcd thread cannot process in time. So more ptlrpcd + * threads sharing BRW load (with PDL_POLICY_ROUND) seems better. + */ + ptlrpcd_add_req(req, pol, -1); RETURN(1); } @@ -2749,12 +2724,15 @@ static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi) } /* called with the loi list lock held */ -void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli) +static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli, int ptlrpc) { struct lov_oinfo *loi; int rc = 0, race_counter = 0; + pdl_policy_t pol; ENTRY; + pol = ptlrpc ? PDL_POLICY_SAME : PDL_POLICY_ROUND; + while ((loi = osc_next_loi(cli)) != NULL) { LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli)); @@ -2769,7 +2747,7 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli) * do io on writes while there are cache waiters */ if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) { rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE, - &loi->loi_write_lop); + &loi->loi_write_lop, pol); if (rc < 0) { CERROR("Write request failed with %d\n", rc); @@ -2799,7 +2777,7 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli) } if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) { rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ, - &loi->loi_read_lop); + &loi->loi_read_lop, pol); if (rc < 0) CERROR("Read request failed with %d\n", rc); @@ -2830,7 +2808,11 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli) if (race_counter == 10) break; } - EXIT; +} + +void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli) +{ + osc_check_rpcs0(env, cli, 0); } /* we're trying to queue a page in the osc so we're subject to the @@ -3008,7 +2990,7 @@ int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp, qid[USRQUOTA] = attr.cat_uid; qid[GRPQUOTA] = attr.cat_gid; if (rc == 0 && - lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA) + osc_quota_chkdq(cli, qid) == NO_QUOTA) rc = -EDQUOT; if (rc) RETURN(rc); @@ -3039,13 +3021,19 @@ int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp, } } - osc_oap_to_pending(oap); - loi_list_maint(cli, loi); - LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page, cmd); - osc_check_rpcs(env, cli); + osc_oap_to_pending(oap); + loi_list_maint(cli, loi); + if (!osc_max_rpc_in_flight(cli, loi) && + lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) { + LASSERT(cli->cl_writeback_work != NULL); + rc = ptlrpcd_queue_work(cli->cl_writeback_work); + + CDEBUG(D_CACHE, "Queued writeback work for client obd %p/%d.\n", + cli, rc); + } client_obd_list_unlock(&cli->cl_loi_list_lock); RETURN(0); @@ -3214,7 +3202,7 @@ static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb, obd_enqueue_update_f upcall, void *cookie, - int *flags, int rc) + int *flags, int agl, int rc) { int intent = *flags & LDLM_FL_HAS_INTENT; ENTRY; @@ -3232,7 +3220,8 @@ static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb, } } - if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) { + if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) || + (rc == 0)) { *flags |= LDLM_FL_LVB_READY; CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n", lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime); @@ -3250,6 +3239,9 @@ static int osc_enqueue_interpret(const struct lu_env *env, struct ldlm_lock *lock; struct lustre_handle handle; __u32 mode; + struct ost_lvb *lvb; + __u32 lvb_len; + int *flags = aa->oa_flags; /* Make a local copy of a lock handle and a mode, because aa->oa_* * might be freed anytime after lock upcall has been called. */ @@ -3269,13 +3261,20 @@ static int osc_enqueue_interpret(const struct lu_env *env, /* Let CP AST to grant the lock first. */ OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1); + if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) { + lvb = NULL; + lvb_len = 0; + } else { + lvb = aa->oa_lvb; + lvb_len = sizeof(*aa->oa_lvb); + } + /* Complete obtaining the lock procedure. */ rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1, - mode, aa->oa_flags, aa->oa_lvb, - sizeof(*aa->oa_lvb), &handle, rc); + mode, flags, lvb, lvb_len, &handle, rc); /* Complete osc stuff. */ - rc = osc_enqueue_fini(req, aa->oa_lvb, - aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc); + rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie, + flags, aa->oa_agl, rc); OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10); @@ -3299,8 +3298,9 @@ void osc_update_enqueue(struct lustre_handle *lov_lockhp, struct lov_oinfo *loi, int flags, struct ost_lvb *lvb, __u32 mode, int rc) { + struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp); + if (rc == ELDLM_OK) { - struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp); __u64 tmp; LASSERT(lock != NULL); @@ -3321,13 +3321,21 @@ void osc_update_enqueue(struct lustre_handle *lov_lockhp, lock->l_policy_data.l_extent.end); } ldlm_lock_allow_match(lock); - LDLM_LOCK_PUT(lock); } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) { + LASSERT(lock != NULL); loi->loi_lvb = *lvb; + ldlm_lock_allow_match(lock); CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving" " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms); rc = ELDLM_OK; } + + if (lock != NULL) { + if (rc != ELDLM_OK) + ldlm_lock_fail_match(lock); + + LDLM_LOCK_PUT(lock); + } } EXPORT_SYMBOL(osc_update_enqueue); @@ -3346,11 +3354,12 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, obd_enqueue_update_f upcall, void *cookie, struct ldlm_enqueue_info *einfo, struct lustre_handle *lockh, - struct ptlrpc_request_set *rqset, int async) + struct ptlrpc_request_set *rqset, int async, int agl) { struct obd_device *obd = exp->exp_obd; struct ptlrpc_request *req = NULL; int intent = *flags & LDLM_FL_HAS_INTENT; + int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY); ldlm_mode_t mode; int rc; ENTRY; @@ -3384,13 +3393,20 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, mode = einfo->ei_mode; if (einfo->ei_mode == LCK_PR) mode |= LCK_PW; - mode = ldlm_lock_match(obd->obd_namespace, - *flags | LDLM_FL_LVB_READY, res_id, + mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id, einfo->ei_type, policy, mode, lockh, 0); if (mode) { struct ldlm_lock *matched = ldlm_handle2lock(lockh); - if (osc_set_lock_data_with_check(matched, einfo)) { + if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) { + /* For AGL, if enqueue RPC is sent but the lock is not + * granted, then skip to process this strpe. + * Return -ECANCELED to tell the caller. */ + ldlm_lock_decref(lockh, mode); + LDLM_LOCK_PUT(matched); + RETURN(-ECANCELED); + } else if (osc_set_lock_data_with_check(matched, einfo)) { + *flags |= LDLM_FL_LVB_READY; /* addref the lock only if not async requests and PW * lock is matched whereas we asked for PR. */ if (!rqset && einfo->ei_mode != mode) @@ -3404,16 +3420,17 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, /* We already have a lock, and it's referenced */ (*upcall)(cookie, ELDLM_OK); - /* For async requests, decref the lock. */ if (einfo->ei_mode != mode) ldlm_lock_decref(lockh, LCK_PW); else if (rqset) + /* For async requests, decref the lock. */ ldlm_lock_decref(lockh, einfo->ei_mode); LDLM_LOCK_PUT(matched); RETURN(ELDLM_OK); - } else + } else { ldlm_lock_decref(lockh, mode); - LDLM_LOCK_PUT(matched); + LDLM_LOCK_PUT(matched); + } } no_match: @@ -3452,11 +3469,12 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, aa->oa_cookie = cookie; aa->oa_lvb = lvb; aa->oa_lockh = lockh; + aa->oa_agl = !!agl; req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_enqueue_interpret; if (rqset == PTLRPCD_SET) - ptlrpcd_add_req(req, PSCOPE_OTHER); + ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); else ptlrpc_set_add_req(rqset, req); } else if (intent) { @@ -3465,7 +3483,7 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, RETURN(rc); } - rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc); + rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc); if (intent) ptlrpc_req_finished(req); @@ -3487,7 +3505,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, &oinfo->oi_md->lsm_oinfo[0]->loi_lvb, oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid, oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh, - rqset, rqset != NULL); + rqset, rqset != NULL, 0); RETURN(rc); } @@ -3890,8 +3908,7 @@ static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len, data->ioc_offset); GOTO(out, err); case OBD_IOC_POLL_QUOTACHECK: - err = lquota_poll_check(quota_interface, exp, - (struct if_quotacheck *)karg); + err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg); GOTO(out, err); case OBD_IOC_PING_TARGET: err = ptlrpc_obd_ping(obd); @@ -4156,7 +4173,7 @@ static int osc_set_info_async(struct obd_export *exp, obd_count keylen, ptlrpc_set_add_req(set, req); ptlrpc_check_set(NULL, set); } else - ptlrpcd_add_req(req, PSCOPE_OTHER); + ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); RETURN(0); } @@ -4451,6 +4468,7 @@ static int osc_cancel_for_recovery(struct ldlm_lock *lock) int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) { + struct client_obd *cli = &obd->u.cli; int rc; ENTRY; @@ -4460,11 +4478,18 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) RETURN(rc); rc = client_obd_setup(obd, lcfg); - if (rc) { - ptlrpcd_decref(); - } else { + if (rc == 0) { + void *handler; + handler = ptlrpcd_alloc_work(cli->cl_import, + brw_queue_work, cli); + if (!IS_ERR(handler)) + cli->cl_writeback_work = handler; + else + rc = PTR_ERR(handler); + } + + if (rc == 0) { struct lprocfs_static_vars lvars = { 0 }; - struct client_obd *cli = &obd->u.cli; cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL; lprocfs_osc_init_vars(&lvars); @@ -4491,6 +4516,8 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery); } + if (rc) + ptlrpcd_decref(); RETURN(rc); } @@ -4512,6 +4539,7 @@ static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) break; } case OBD_CLEANUP_EXPORTS: { + struct client_obd *cli = &obd->u.cli; /* LU-464 * for echo client, export may be on zombie list, wait for * zombie thread to cull it, because cli.cl_import will be @@ -4522,7 +4550,13 @@ static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) * client_disconnect_export() */ obd_zombie_barrier(); + if (cli->cl_writeback_work) { + ptlrpcd_destroy_work(cli->cl_writeback_work); + cli->cl_writeback_work = NULL; + } obd_cleanup_client_import(obd); + ptlrpc_lprocfs_unregister_obd(obd); + lprocfs_obd_cleanup(obd); rc = obd_llog_finish(obd, 0); if (rc != 0) CERROR("failed to cleanup llogging subsystems\n"); @@ -4537,11 +4571,9 @@ int osc_cleanup(struct obd_device *obd) int rc; ENTRY; - ptlrpc_lprocfs_unregister_obd(obd); - lprocfs_obd_cleanup(obd); /* free memory of osc quota cache */ - lquota_cleanup(quota_interface, obd); + osc_quota_cleanup(obd); rc = client_obd_cleanup(obd); @@ -4610,6 +4642,9 @@ struct obd_ops osc_obd_ops = { .o_llog_init = osc_llog_init, .o_llog_finish = osc_llog_finish, .o_process_config = osc_process_config, + .o_quotactl = osc_quotactl, + .o_quotacheck = osc_quotacheck, + .o_quota_adjust_qunit = osc_quota_adjust_qunit, }; extern struct lu_kmem_descr osc_caches[]; @@ -4625,22 +4660,16 @@ int __init osc_init(void) /* print an address of _any_ initialized kernel symbol from this * module, to allow debugging with gdb that doesn't support data * symbols from modules.*/ - CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches); + CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches); rc = lu_kmem_init(osc_caches); lprocfs_osc_init_vars(&lvars); - cfs_request_module("lquota"); - quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface); - lquota_init(quota_interface); - init_obd_quota_ops(quota_interface, &osc_obd_ops); - + osc_quota_init(); rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars, LUSTRE_OSC_NAME, &osc_device_type); if (rc) { - if (quota_interface) - PORTAL_SYMBOL_PUT(osc_quota_interface); lu_kmem_fini(osc_caches); RETURN(rc); } @@ -4662,10 +4691,7 @@ static void /*__exit*/ osc_exit(void) { lu_device_type_fini(&osc_device_type); - lquota_exit(quota_interface); - if (quota_interface) - PORTAL_SYMBOL_PUT(osc_quota_interface); - + osc_quota_exit(); class_unregister_type(LUSTRE_OSC_NAME); lu_kmem_fini(osc_caches); }