X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosc%2Fosc_request.c;h=2c4cef7d9f1b1a49e16f9c61ad17556eac7f0895;hp=f4f2c7aa7fa6099633d94f891e2afdb69aba2ab1;hb=f0d608786a27dfb8dddf06d6b086b491749557f1;hpb=046a965cf0f54265208e74566d181f1ee9a3a729 diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index f4f2c7a..2c4cef7 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -69,6 +69,8 @@ static void osc_release_ppga(struct brw_page **ppga, obd_count count); static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *data, int rc); +static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli, + int ptlrpc); int osc_cleanup(struct obd_device *obd); /* Pack OSC object metadata for disk storage (LE byte order). */ @@ -1265,7 +1267,7 @@ static obd_count osc_checksum_bulk(int nob, obd_count pg_count, if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND)) cksum++; - return cksum; + return fini_checksum(cksum, cksum_type); } static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, @@ -1965,7 +1967,6 @@ static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap, static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop, int cmd) { - int optimal; ENTRY; if (lop->lop_num_pending == 0) @@ -1986,8 +1987,7 @@ static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop, CDEBUG(D_CACHE, "urgent request forcing RPC\n"); RETURN(1); } - /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */ - optimal = cli->cl_max_pages_per_rpc; + if (cmd & OBD_BRW_WRITE) { /* trigger a write rpc stream as long as there are dirtiers * waiting for space. as they're waiting, they're not going to @@ -1996,13 +1996,8 @@ static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop, CDEBUG(D_CACHE, "cache waiters forcing RPC\n"); RETURN(1); } - /* +16 to avoid triggering rpcs that would want to include pages - * that are being queued but which can't be made ready until - * the queuer finishes with the page. this is a wart for - * llite::commit_write() */ - optimal += 16; } - if (lop->lop_num_pending >= optimal) + if (lop->lop_num_pending >= cli->cl_max_pages_per_rpc) RETURN(1); RETURN(0); @@ -2204,6 +2199,18 @@ static void osc_ap_completion(const struct lu_env *env, EXIT; } +static int brw_queue_work(const struct lu_env *env, void *data) +{ + struct client_obd *cli = data; + + CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli); + + client_obd_list_lock(&cli->cl_loi_list_lock); + osc_check_rpcs0(env, cli, 1); + client_obd_list_unlock(&cli->cl_loi_list_lock); + RETURN(0); +} + static int brw_interpret(const struct lu_env *env, struct ptlrpc_request *req, void *data, int rc) { @@ -2226,7 +2233,6 @@ static int brw_interpret(const struct lu_env *env, } cli = aa->aa_cli; - client_obd_list_lock(&cli->cl_loi_list_lock); /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters @@ -2254,8 +2260,9 @@ static int brw_interpret(const struct lu_env *env, osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1); } osc_wake_cache_waiters(cli); - osc_check_rpcs(env, cli); + osc_check_rpcs0(env, cli, 1); client_obd_list_unlock(&cli->cl_loi_list_lock); + if (!async) cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc : req->rq_bulk->bd_nob_transferred); @@ -2404,8 +2411,8 @@ out: */ static int osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, - struct lov_oinfo *loi, - int cmd, struct loi_oap_pages *lop) + struct lov_oinfo *loi, int cmd, + struct loi_oap_pages *lop, pdl_policy_t pol) { struct ptlrpc_request *req; obd_count page_count = 0; @@ -2425,10 +2432,14 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, * with ASYNC_HP. We have to send out them as soon as possible. */ cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) { if (oap->oap_async_flags & ASYNC_HP) - cfs_list_move(&oap->oap_pending_item, &lop->lop_pending); + cfs_list_move(&oap->oap_pending_item, &rpc_list); + else if (!(oap->oap_brw_flags & OBD_BRW_SYNC)) + /* only do this for writeback pages. */ + cfs_list_move_tail(&oap->oap_pending_item, &rpc_list); if (++page_count >= cli->cl_max_pages_per_rpc) break; } + cfs_list_splice_init(&rpc_list, &lop->lop_pending); page_count = 0; /* first we find the pages we're allowed to work with */ @@ -2639,7 +2650,7 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli, * single ptlrpcd thread cannot process in time. So more ptlrpcd * threads sharing BRW load (with PDL_POLICY_ROUND) seems better. */ - ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1); + ptlrpcd_add_req(req, pol, -1); RETURN(1); } @@ -2713,12 +2724,15 @@ static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi) } /* called with the loi list lock held */ -void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli) +static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli, int ptlrpc) { struct lov_oinfo *loi; int rc = 0, race_counter = 0; + pdl_policy_t pol; ENTRY; + pol = ptlrpc ? PDL_POLICY_SAME : PDL_POLICY_ROUND; + while ((loi = osc_next_loi(cli)) != NULL) { LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli)); @@ -2733,7 +2747,7 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli) * do io on writes while there are cache waiters */ if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) { rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE, - &loi->loi_write_lop); + &loi->loi_write_lop, pol); if (rc < 0) { CERROR("Write request failed with %d\n", rc); @@ -2763,7 +2777,7 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli) } if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) { rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ, - &loi->loi_read_lop); + &loi->loi_read_lop, pol); if (rc < 0) CERROR("Read request failed with %d\n", rc); @@ -2794,7 +2808,11 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli) if (race_counter == 10) break; } - EXIT; +} + +void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli) +{ + osc_check_rpcs0(env, cli, 0); } /* we're trying to queue a page in the osc so we're subject to the @@ -3003,13 +3021,19 @@ int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp, } } - osc_oap_to_pending(oap); - loi_list_maint(cli, loi); - LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page, cmd); - osc_check_rpcs(env, cli); + osc_oap_to_pending(oap); + loi_list_maint(cli, loi); + if (!osc_max_rpc_in_flight(cli, loi) && + lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) { + LASSERT(cli->cl_writeback_work != NULL); + rc = ptlrpcd_queue_work(cli->cl_writeback_work); + + CDEBUG(D_CACHE, "Queued writeback work for client obd %p/%d.\n", + cli, rc); + } client_obd_list_unlock(&cli->cl_loi_list_lock); RETURN(0); @@ -3178,7 +3202,7 @@ static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb, obd_enqueue_update_f upcall, void *cookie, - int *flags, int rc) + int *flags, int agl, int rc) { int intent = *flags & LDLM_FL_HAS_INTENT; ENTRY; @@ -3196,7 +3220,8 @@ static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb, } } - if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) { + if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) || + (rc == 0)) { *flags |= LDLM_FL_LVB_READY; CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n", lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime); @@ -3214,6 +3239,9 @@ static int osc_enqueue_interpret(const struct lu_env *env, struct ldlm_lock *lock; struct lustre_handle handle; __u32 mode; + struct ost_lvb *lvb; + __u32 lvb_len; + int *flags = aa->oa_flags; /* Make a local copy of a lock handle and a mode, because aa->oa_* * might be freed anytime after lock upcall has been called. */ @@ -3233,13 +3261,20 @@ static int osc_enqueue_interpret(const struct lu_env *env, /* Let CP AST to grant the lock first. */ OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1); + if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) { + lvb = NULL; + lvb_len = 0; + } else { + lvb = aa->oa_lvb; + lvb_len = sizeof(*aa->oa_lvb); + } + /* Complete obtaining the lock procedure. */ rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1, - mode, aa->oa_flags, aa->oa_lvb, - sizeof(*aa->oa_lvb), &handle, rc); + mode, flags, lvb, lvb_len, &handle, rc); /* Complete osc stuff. */ - rc = osc_enqueue_fini(req, aa->oa_lvb, - aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc); + rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie, + flags, aa->oa_agl, rc); OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10); @@ -3263,8 +3298,9 @@ void osc_update_enqueue(struct lustre_handle *lov_lockhp, struct lov_oinfo *loi, int flags, struct ost_lvb *lvb, __u32 mode, int rc) { + struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp); + if (rc == ELDLM_OK) { - struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp); __u64 tmp; LASSERT(lock != NULL); @@ -3285,13 +3321,21 @@ void osc_update_enqueue(struct lustre_handle *lov_lockhp, lock->l_policy_data.l_extent.end); } ldlm_lock_allow_match(lock); - LDLM_LOCK_PUT(lock); } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) { + LASSERT(lock != NULL); loi->loi_lvb = *lvb; + ldlm_lock_allow_match(lock); CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving" " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms); rc = ELDLM_OK; } + + if (lock != NULL) { + if (rc != ELDLM_OK) + ldlm_lock_fail_match(lock); + + LDLM_LOCK_PUT(lock); + } } EXPORT_SYMBOL(osc_update_enqueue); @@ -3310,11 +3354,12 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, obd_enqueue_update_f upcall, void *cookie, struct ldlm_enqueue_info *einfo, struct lustre_handle *lockh, - struct ptlrpc_request_set *rqset, int async) + struct ptlrpc_request_set *rqset, int async, int agl) { struct obd_device *obd = exp->exp_obd; struct ptlrpc_request *req = NULL; int intent = *flags & LDLM_FL_HAS_INTENT; + int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY); ldlm_mode_t mode; int rc; ENTRY; @@ -3348,13 +3393,20 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, mode = einfo->ei_mode; if (einfo->ei_mode == LCK_PR) mode |= LCK_PW; - mode = ldlm_lock_match(obd->obd_namespace, - *flags | LDLM_FL_LVB_READY, res_id, + mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id, einfo->ei_type, policy, mode, lockh, 0); if (mode) { struct ldlm_lock *matched = ldlm_handle2lock(lockh); - if (osc_set_lock_data_with_check(matched, einfo)) { + if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) { + /* For AGL, if enqueue RPC is sent but the lock is not + * granted, then skip to process this strpe. + * Return -ECANCELED to tell the caller. */ + ldlm_lock_decref(lockh, mode); + LDLM_LOCK_PUT(matched); + RETURN(-ECANCELED); + } else if (osc_set_lock_data_with_check(matched, einfo)) { + *flags |= LDLM_FL_LVB_READY; /* addref the lock only if not async requests and PW * lock is matched whereas we asked for PR. */ if (!rqset && einfo->ei_mode != mode) @@ -3368,16 +3420,17 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, /* We already have a lock, and it's referenced */ (*upcall)(cookie, ELDLM_OK); - /* For async requests, decref the lock. */ if (einfo->ei_mode != mode) ldlm_lock_decref(lockh, LCK_PW); else if (rqset) + /* For async requests, decref the lock. */ ldlm_lock_decref(lockh, einfo->ei_mode); LDLM_LOCK_PUT(matched); RETURN(ELDLM_OK); - } else + } else { ldlm_lock_decref(lockh, mode); - LDLM_LOCK_PUT(matched); + LDLM_LOCK_PUT(matched); + } } no_match: @@ -3416,6 +3469,7 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, aa->oa_cookie = cookie; aa->oa_lvb = lvb; aa->oa_lockh = lockh; + aa->oa_agl = !!agl; req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_enqueue_interpret; @@ -3429,7 +3483,7 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, RETURN(rc); } - rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc); + rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc); if (intent) ptlrpc_req_finished(req); @@ -3451,7 +3505,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, &oinfo->oi_md->lsm_oinfo[0]->loi_lvb, oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid, oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh, - rqset, rqset != NULL); + rqset, rqset != NULL, 0); RETURN(rc); } @@ -4414,6 +4468,7 @@ static int osc_cancel_for_recovery(struct ldlm_lock *lock) int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) { + struct client_obd *cli = &obd->u.cli; int rc; ENTRY; @@ -4423,11 +4478,18 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) RETURN(rc); rc = client_obd_setup(obd, lcfg); - if (rc) { - ptlrpcd_decref(); - } else { + if (rc == 0) { + void *handler; + handler = ptlrpcd_alloc_work(cli->cl_import, + brw_queue_work, cli); + if (!IS_ERR(handler)) + cli->cl_writeback_work = handler; + else + rc = PTR_ERR(handler); + } + + if (rc == 0) { struct lprocfs_static_vars lvars = { 0 }; - struct client_obd *cli = &obd->u.cli; cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL; lprocfs_osc_init_vars(&lvars); @@ -4454,6 +4516,8 @@ int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg) ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery); } + if (rc) + ptlrpcd_decref(); RETURN(rc); } @@ -4475,6 +4539,7 @@ static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) break; } case OBD_CLEANUP_EXPORTS: { + struct client_obd *cli = &obd->u.cli; /* LU-464 * for echo client, export may be on zombie list, wait for * zombie thread to cull it, because cli.cl_import will be @@ -4485,6 +4550,10 @@ static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage) * client_disconnect_export() */ obd_zombie_barrier(); + if (cli->cl_writeback_work) { + ptlrpcd_destroy_work(cli->cl_writeback_work); + cli->cl_writeback_work = NULL; + } obd_cleanup_client_import(obd); ptlrpc_lprocfs_unregister_obd(obd); lprocfs_obd_cleanup(obd); @@ -4591,7 +4660,7 @@ int __init osc_init(void) /* print an address of _any_ initialized kernel symbol from this * module, to allow debugging with gdb that doesn't support data * symbols from modules.*/ - CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches); + CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches); rc = lu_kmem_init(osc_caches);