/*
* Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
- */
-/*
- * Copyright (c) 2011 Whamcloud, Inc.
+ *
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
cli->cl_dirty_transit -= CFS_PAGE_SIZE;
}
if (!sent) {
- cli->cl_lost_grant += CFS_PAGE_SIZE;
+ /* Reclaim grant from truncated pages. This is used to solve
+ * write-truncate and grant all gone(to lost_grant) problem.
+ * For a vfs write this problem can be easily solved by a sync
+ * write, however, this is not an option for page_mkwrite()
+ * because grant has to be allocated before a page becomes
+ * dirty. */
+ if (cli->cl_avail_grant < PTLRPC_MAX_BRW_SIZE)
+ cli->cl_avail_grant += CFS_PAGE_SIZE;
+ else
+ cli->cl_lost_grant += CFS_PAGE_SIZE;
CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
} else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
&ocw->ocw_oap->oap_brw_page);
}
+ CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld\n",
+ ocw, ocw->ocw_oap, cli->cl_avail_grant);
+
cfs_waitq_signal(&ocw->ocw_waitq);
}
/* size[REQ_REC_OFF] still sizeof (*body) */
if (opc == OST_WRITE) {
- if (unlikely(cli->cl_checksum) &&
+ if (cli->cl_checksum &&
!sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
/* store cl_cksum_type in a local variable since
* it can be changed via lprocfs */
req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
sizeof(__u32) * niocount);
} else {
- if (unlikely(cli->cl_checksum) &&
+ if (cli->cl_checksum &&
!sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
body->oa.o_flags = 0;
struct ptlrpc_request *req;
int rc;
cfs_waitq_t waitq;
- int resends = 0;
+ int generation, resends = 0;
struct l_wait_info lwi;
ENTRY;
cfs_waitq_init(&waitq);
+ generation = exp->exp_obd->u.cli.cl_import->imp_generation;
restart_bulk:
rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
if (rc != 0)
return (rc);
+ if (resends) {
+ req->rq_generation_set = 1;
+ req->rq_import_generation = generation;
+ }
+
rc = ptlrpc_queue_wait(req);
if (rc == -ETIMEDOUT && req->rq_resend) {
rc = osc_brw_fini_request(req, rc);
ptlrpc_req_finished(req);
+ /* When server return -EINPROGRESS, client should always retry
+ * regardless of the number of times the bulk was resent already.*/
if (osc_recoverable_error(rc)) {
resends++;
- if (!client_should_resend(resends, &exp->exp_obd->u.cli)) {
- CERROR("too many resend retries, returning error\n");
- RETURN(-EIO);
+ if (rc != -EINPROGRESS &&
+ !client_should_resend(resends, &exp->exp_obd->u.cli)) {
+ CERROR("%s: too many resend retries for object: "
+ ""LPU64":"LPU64", rc = %d.\n",
+ exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
+ goto out;
+ }
+ if (generation !=
+ exp->exp_obd->u.cli.cl_import->imp_generation) {
+ CDEBUG(D_HA, "%s: resend cross eviction for object: "
+ ""LPU64":"LPU64", rc = %d.\n",
+ exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
+ goto out;
}
- lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
+ lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
+ NULL);
l_wait_event(waitq, 0, &lwi);
goto restart_bulk;
}
-
+out:
+ if (rc == -EAGAIN || rc == -EINPROGRESS)
+ rc = -EIO;
RETURN (rc);
}
int rc = 0;
ENTRY;
- if (!client_should_resend(aa->aa_resends, aa->aa_cli)) {
- CERROR("too many resent retries, returning error\n");
- RETURN(-EIO);
- }
-
DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
new_req->rq_interpret_reply = request->rq_interpret_reply;
new_req->rq_async_args = request->rq_async_args;
new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
+ new_req->rq_generation_set = 1;
+ new_req->rq_import_generation = request->rq_import_generation;
new_aa = ptlrpc_req_async_args(new_req);
rc = osc_brw_fini_request(req, rc);
CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
+ /* When server return -EINPROGRESS, client should always retry
+ * regardless of the number of times the bulk was resent already. */
if (osc_recoverable_error(rc)) {
- rc = osc_brw_redo_request(req, aa);
+ if (req->rq_import_generation !=
+ req->rq_import->imp_generation) {
+ CDEBUG(D_HA, "%s: resend cross eviction for object: "
+ ""LPU64":"LPU64", rc = %d.\n",
+ req->rq_import->imp_obd->obd_name,
+ aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
+ } else if (rc == -EINPROGRESS ||
+ client_should_resend(aa->aa_resends, aa->aa_cli)) {
+ rc = osc_brw_redo_request(req, aa);
+ } else {
+ CERROR("%s: too many resent retries for object: "
+ ""LPU64":"LPU64", rc = %d.\n",
+ req->rq_import->imp_obd->obd_name,
+ aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
+ }
+
if (rc == 0)
RETURN(0);
+ else if (rc == -EAGAIN || rc == -EINPROGRESS)
+ rc = -EIO;
}
if (aa->aa_ocapa) {
break;
}
- osc_wake_cache_waiters(cli);
-
loi_list_maint(cli, loi);
client_obd_list_unlock(&cli->cl_loi_list_lock);
osc_check_rpcs0(env, cli, 0);
}
-/* we're trying to queue a page in the osc so we're subject to the
- * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
- * If the osc's queued pages are already at that limit, then we want to sleep
- * until there is space in the osc's queue for us. We also may be waiting for
- * write credits from the OST if there are RPCs in flight that may return some
- * before we fall back to sync writes.
- *
- * We need this know our allocation was granted in the presence of signals */
-static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
-{
- int rc;
- ENTRY;
- client_obd_list_lock(&cli->cl_loi_list_lock);
- rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
- client_obd_list_unlock(&cli->cl_loi_list_lock);
- RETURN(rc);
-};
-
/**
* Non-blocking version of osc_enter_cache() that consumes grant only when it
* is available.
{
struct osc_cache_waiter ocw;
struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
-
+ int rc = -EDQUOT;
ENTRY;
CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
osc_enter_cache_try(env, cli, loi, oap, 0))
RETURN(0);
- /* It is safe to block as a cache waiter as long as there is grant
- * space available or the hope of additional grant being returned
- * when an in flight write completes. Using the write back cache
- * if possible is preferable to sending the data synchronously
- * because write pages can then be merged in to large requests.
- * The addition of this cache waiter will causing pending write
- * pages to be sent immediately. */
- if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
+ /* We can get here for two reasons: too many dirty pages in cache, or
+ * run out of grants. In both cases we should write dirty pages out.
+ * Adding a cache waiter will trigger urgent write-out no matter what
+ * RPC size will be.
+ * The exiting condition is no avail grants and no dirty pages caching,
+ * that really means there is no space on the OST. */
+ cfs_waitq_init(&ocw.ocw_waitq);
+ ocw.ocw_oap = oap;
+ while (cli->cl_dirty > 0) {
cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
- cfs_waitq_init(&ocw.ocw_waitq);
- ocw.ocw_oap = oap;
ocw.ocw_rc = 0;
loi_list_maint(cli, loi);
osc_check_rpcs(env, cli);
client_obd_list_unlock(&cli->cl_loi_list_lock);
- CDEBUG(D_CACHE, "sleeping for cache space\n");
- l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
+ CDEBUG(D_CACHE, "%s: sleeping for cache space @ %p for %p\n",
+ cli->cl_import->imp_obd->obd_name, &ocw, oap);
+
+ rc = l_wait_event(ocw.ocw_waitq, cfs_list_empty(&ocw.ocw_entry), &lwi);
client_obd_list_lock(&cli->cl_loi_list_lock);
- if (!cfs_list_empty(&ocw.ocw_entry)) {
- cfs_list_del(&ocw.ocw_entry);
- RETURN(-EINTR);
- }
- RETURN(ocw.ocw_rc);
+ cfs_list_del_init(&ocw.ocw_entry);
+ if (rc < 0)
+ break;
+
+ rc = ocw.ocw_rc;
+ if (rc != -EDQUOT)
+ break;
}
- RETURN(-EDQUOT);
+ RETURN(rc);
}
if (lock != NULL) {
if (rc != ELDLM_OK)
- ldlm_lock_fail_match(lock, rc);
+ ldlm_lock_fail_match(lock);
LDLM_LOCK_PUT(lock);
}
LASSERT(olg == &obd->obd_olg);
- cfs_mutex_down(&olg->olg_cat_processing);
+ cfs_mutex_lock(&olg->olg_cat_processing);
rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
if (rc) {
CERROR("rc: %d\n", rc);
}
out:
- cfs_mutex_up(&olg->olg_cat_processing);
+ cfs_mutex_unlock(&olg->olg_cat_processing);
return rc;
}
ptlrpc_add_rqs_to_pool);
CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
- cfs_sema_init(&cli->cl_grant_sem, 1);
ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
}