static void osc_release_ppga(struct brw_page **ppga, obd_count count);
static int brw_interpret(const struct lu_env *env,
struct ptlrpc_request *req, void *data, int rc);
+static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli,
+ int ptlrpc);
int osc_cleanup(struct obd_device *obd);
/* Pack OSC object metadata for disk storage (LE byte order). */
if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
cksum++;
- return cksum;
+ return fini_checksum(cksum, cksum_type);
}
static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
int cmd)
{
- int optimal;
ENTRY;
if (lop->lop_num_pending == 0)
CDEBUG(D_CACHE, "urgent request forcing RPC\n");
RETURN(1);
}
- /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
- optimal = cli->cl_max_pages_per_rpc;
+
if (cmd & OBD_BRW_WRITE) {
/* trigger a write rpc stream as long as there are dirtiers
* waiting for space. as they're waiting, they're not going to
CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
RETURN(1);
}
- /* +16 to avoid triggering rpcs that would want to include pages
- * that are being queued but which can't be made ready until
- * the queuer finishes with the page. this is a wart for
- * llite::commit_write() */
- optimal += 16;
}
- if (lop->lop_num_pending >= optimal)
+ if (lop->lop_num_pending >= cli->cl_max_pages_per_rpc)
RETURN(1);
RETURN(0);
EXIT;
}
+static int brw_queue_work(const struct lu_env *env, void *data)
+{
+ struct client_obd *cli = data;
+
+ CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
+
+ client_obd_list_lock(&cli->cl_loi_list_lock);
+ osc_check_rpcs0(env, cli, 1);
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
+ RETURN(0);
+}
+
static int brw_interpret(const struct lu_env *env,
struct ptlrpc_request *req, void *data, int rc)
{
}
cli = aa->aa_cli;
-
client_obd_list_lock(&cli->cl_loi_list_lock);
/* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
}
osc_wake_cache_waiters(cli);
- osc_check_rpcs(env, cli);
+ osc_check_rpcs0(env, cli, 1);
client_obd_list_unlock(&cli->cl_loi_list_lock);
+
if (!async)
cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
req->rq_bulk->bd_nob_transferred);
*/
static int
osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
- struct lov_oinfo *loi,
- int cmd, struct loi_oap_pages *lop)
+ struct lov_oinfo *loi, int cmd,
+ struct loi_oap_pages *lop, pdl_policy_t pol)
{
struct ptlrpc_request *req;
obd_count page_count = 0;
* with ASYNC_HP. We have to send out them as soon as possible. */
cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
if (oap->oap_async_flags & ASYNC_HP)
- cfs_list_move(&oap->oap_pending_item, &lop->lop_pending);
+ cfs_list_move(&oap->oap_pending_item, &rpc_list);
+ else if (!(oap->oap_brw_flags & OBD_BRW_SYNC))
+ /* only do this for writeback pages. */
+ cfs_list_move_tail(&oap->oap_pending_item, &rpc_list);
if (++page_count >= cli->cl_max_pages_per_rpc)
break;
}
+ cfs_list_splice_init(&rpc_list, &lop->lop_pending);
page_count = 0;
/* first we find the pages we're allowed to work with */
* single ptlrpcd thread cannot process in time. So more ptlrpcd
* threads sharing BRW load (with PDL_POLICY_ROUND) seems better.
*/
- ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+ ptlrpcd_add_req(req, pol, -1);
RETURN(1);
}
}
/* called with the loi list lock held */
-void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
+static void osc_check_rpcs0(const struct lu_env *env, struct client_obd *cli, int ptlrpc)
{
struct lov_oinfo *loi;
int rc = 0, race_counter = 0;
+ pdl_policy_t pol;
ENTRY;
+ pol = ptlrpc ? PDL_POLICY_SAME : PDL_POLICY_ROUND;
+
while ((loi = osc_next_loi(cli)) != NULL) {
LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
* do io on writes while there are cache waiters */
if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
- &loi->loi_write_lop);
+ &loi->loi_write_lop, pol);
if (rc < 0) {
CERROR("Write request failed with %d\n", rc);
}
if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
- &loi->loi_read_lop);
+ &loi->loi_read_lop, pol);
if (rc < 0)
CERROR("Read request failed with %d\n", rc);
if (race_counter == 10)
break;
}
- EXIT;
+}
+
+void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
+{
+ osc_check_rpcs0(env, cli, 0);
}
/* we're trying to queue a page in the osc so we're subject to the
}
}
- osc_oap_to_pending(oap);
- loi_list_maint(cli, loi);
-
LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
cmd);
- osc_check_rpcs(env, cli);
+ osc_oap_to_pending(oap);
+ loi_list_maint(cli, loi);
+ if (!osc_max_rpc_in_flight(cli, loi) &&
+ lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
+ LASSERT(cli->cl_writeback_work != NULL);
+ rc = ptlrpcd_queue_work(cli->cl_writeback_work);
+
+ CDEBUG(D_CACHE, "Queued writeback work for client obd %p/%d.\n",
+ cli, rc);
+ }
client_obd_list_unlock(&cli->cl_loi_list_lock);
RETURN(0);
static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
obd_enqueue_update_f upcall, void *cookie,
- int *flags, int rc)
+ int *flags, int agl, int rc)
{
int intent = *flags & LDLM_FL_HAS_INTENT;
ENTRY;
}
}
- if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
+ if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
+ (rc == 0)) {
*flags |= LDLM_FL_LVB_READY;
CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
struct ldlm_lock *lock;
struct lustre_handle handle;
__u32 mode;
+ struct ost_lvb *lvb;
+ __u32 lvb_len;
+ int *flags = aa->oa_flags;
/* Make a local copy of a lock handle and a mode, because aa->oa_*
* might be freed anytime after lock upcall has been called. */
/* Let CP AST to grant the lock first. */
OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
+ if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
+ lvb = NULL;
+ lvb_len = 0;
+ } else {
+ lvb = aa->oa_lvb;
+ lvb_len = sizeof(*aa->oa_lvb);
+ }
+
/* Complete obtaining the lock procedure. */
rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
- mode, aa->oa_flags, aa->oa_lvb,
- sizeof(*aa->oa_lvb), &handle, rc);
+ mode, flags, lvb, lvb_len, &handle, rc);
/* Complete osc stuff. */
- rc = osc_enqueue_fini(req, aa->oa_lvb,
- aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
+ rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
+ flags, aa->oa_agl, rc);
OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
struct lov_oinfo *loi, int flags,
struct ost_lvb *lvb, __u32 mode, int rc)
{
+ struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
+
if (rc == ELDLM_OK) {
- struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
__u64 tmp;
LASSERT(lock != NULL);
lock->l_policy_data.l_extent.end);
}
ldlm_lock_allow_match(lock);
- LDLM_LOCK_PUT(lock);
} else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
+ LASSERT(lock != NULL);
loi->loi_lvb = *lvb;
+ ldlm_lock_allow_match(lock);
CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
" kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
rc = ELDLM_OK;
}
+
+ if (lock != NULL) {
+ if (rc != ELDLM_OK)
+ ldlm_lock_fail_match(lock);
+
+ LDLM_LOCK_PUT(lock);
+ }
}
EXPORT_SYMBOL(osc_update_enqueue);
obd_enqueue_update_f upcall, void *cookie,
struct ldlm_enqueue_info *einfo,
struct lustre_handle *lockh,
- struct ptlrpc_request_set *rqset, int async)
+ struct ptlrpc_request_set *rqset, int async, int agl)
{
struct obd_device *obd = exp->exp_obd;
struct ptlrpc_request *req = NULL;
int intent = *flags & LDLM_FL_HAS_INTENT;
+ int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
ldlm_mode_t mode;
int rc;
ENTRY;
mode = einfo->ei_mode;
if (einfo->ei_mode == LCK_PR)
mode |= LCK_PW;
- mode = ldlm_lock_match(obd->obd_namespace,
- *flags | LDLM_FL_LVB_READY, res_id,
+ mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
einfo->ei_type, policy, mode, lockh, 0);
if (mode) {
struct ldlm_lock *matched = ldlm_handle2lock(lockh);
- if (osc_set_lock_data_with_check(matched, einfo)) {
+ if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
+ /* For AGL, if enqueue RPC is sent but the lock is not
+ * granted, then skip to process this strpe.
+ * Return -ECANCELED to tell the caller. */
+ ldlm_lock_decref(lockh, mode);
+ LDLM_LOCK_PUT(matched);
+ RETURN(-ECANCELED);
+ } else if (osc_set_lock_data_with_check(matched, einfo)) {
+ *flags |= LDLM_FL_LVB_READY;
/* addref the lock only if not async requests and PW
* lock is matched whereas we asked for PR. */
if (!rqset && einfo->ei_mode != mode)
/* We already have a lock, and it's referenced */
(*upcall)(cookie, ELDLM_OK);
- /* For async requests, decref the lock. */
if (einfo->ei_mode != mode)
ldlm_lock_decref(lockh, LCK_PW);
else if (rqset)
+ /* For async requests, decref the lock. */
ldlm_lock_decref(lockh, einfo->ei_mode);
LDLM_LOCK_PUT(matched);
RETURN(ELDLM_OK);
- } else
+ } else {
ldlm_lock_decref(lockh, mode);
- LDLM_LOCK_PUT(matched);
+ LDLM_LOCK_PUT(matched);
+ }
}
no_match:
aa->oa_cookie = cookie;
aa->oa_lvb = lvb;
aa->oa_lockh = lockh;
+ aa->oa_agl = !!agl;
req->rq_interpret_reply =
(ptlrpc_interpterer_t)osc_enqueue_interpret;
RETURN(rc);
}
- rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
+ rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
if (intent)
ptlrpc_req_finished(req);
&oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
- rqset, rqset != NULL);
+ rqset, rqset != NULL, 0);
RETURN(rc);
}
int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
{
+ struct client_obd *cli = &obd->u.cli;
int rc;
ENTRY;
RETURN(rc);
rc = client_obd_setup(obd, lcfg);
- if (rc) {
- ptlrpcd_decref();
- } else {
+ if (rc == 0) {
+ void *handler;
+ handler = ptlrpcd_alloc_work(cli->cl_import,
+ brw_queue_work, cli);
+ if (!IS_ERR(handler))
+ cli->cl_writeback_work = handler;
+ else
+ rc = PTR_ERR(handler);
+ }
+
+ if (rc == 0) {
struct lprocfs_static_vars lvars = { 0 };
- struct client_obd *cli = &obd->u.cli;
cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
lprocfs_osc_init_vars(&lvars);
ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
}
+ if (rc)
+ ptlrpcd_decref();
RETURN(rc);
}
break;
}
case OBD_CLEANUP_EXPORTS: {
+ struct client_obd *cli = &obd->u.cli;
/* LU-464
* for echo client, export may be on zombie list, wait for
* zombie thread to cull it, because cli.cl_import will be
* client_disconnect_export()
*/
obd_zombie_barrier();
+ if (cli->cl_writeback_work) {
+ ptlrpcd_destroy_work(cli->cl_writeback_work);
+ cli->cl_writeback_work = NULL;
+ }
obd_cleanup_client_import(obd);
ptlrpc_lprocfs_unregister_obd(obd);
lprocfs_obd_cleanup(obd);
/* print an address of _any_ initialized kernel symbol from this
* module, to allow debugging with gdb that doesn't support data
* symbols from modules.*/
- CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
+ CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
rc = lu_kmem_init(osc_caches);