/* ----- part 2 ----- */ \
__ext->oe_grants, __ext->oe_nr_pages, \
list_empty_marker(&__ext->oe_pages), \
- cfs_waitq_active(&__ext->oe_waitq) ? '+' : '-', \
+ waitqueue_active(&__ext->oe_waitq) ? '+' : '-', \
__ext->oe_osclock, __ext->oe_mppr, __ext->oe_owner, \
/* ----- part 4 ----- */ \
## __VA_ARGS__); \
/* TODO: validate the state machine */
ext->oe_state = state;
- cfs_waitq_broadcast(&ext->oe_waitq);
+ wake_up_all(&ext->oe_waitq);
}
static struct osc_extent *osc_extent_alloc(struct osc_object *obj)
CFS_INIT_LIST_HEAD(&ext->oe_link);
ext->oe_state = OES_INV;
CFS_INIT_LIST_HEAD(&ext->oe_pages);
- cfs_waitq_init(&ext->oe_waitq);
+ init_waitqueue_head(&ext->oe_waitq);
ext->oe_osclock = NULL;
return ext;
"%s: wait ext to %d timedout, recovery in progress?\n",
osc_export(obj)->exp_obd->obd_name, state);
- lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
+ lwi = LWI_INTR(NULL, NULL);
rc = l_wait_event(ext->oe_waitq, extent_wait_cb(ext, state),
&lwi);
}
cfs_list_del_init(&oap->oap_pending_item);
cl_page_get(page);
- lu_ref_add(&page->cp_reference, "truncate", cfs_current());
+ lu_ref_add(&page->cp_reference, "truncate", current);
if (cl_page_own(env, io, page) == 0) {
cl_page_unmap(env, io, page);
LASSERT(0);
}
- lu_ref_del(&page->cp_reference, "truncate", cfs_current());
+ lu_ref_del(&page->cp_reference, "truncate", current);
cl_page_put(env, page);
--ext->oe_nr_pages;
last->oap_count = osc_refresh_count(env, last, OBD_BRW_WRITE);
LASSERT(last->oap_count > 0);
LASSERT(last->oap_page_off + last->oap_count <= PAGE_CACHE_SIZE);
+ spin_lock(&last->oap_lock);
last->oap_async_flags |= ASYNC_COUNT_STABLE;
+ spin_unlock(&last->oap_lock);
}
/* for the rest of pages, we don't need to call osf_refresh_count()
cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
oap->oap_count = PAGE_CACHE_SIZE - oap->oap_page_off;
+ spin_lock(&oap->oap_lock);
oap->oap_async_flags |= ASYNC_COUNT_STABLE;
+ spin_unlock(&oap->oap_lock);
}
}
ENTRY;
cmd &= ~OBD_BRW_NOQUOTA;
- LASSERT(equi(page->cp_state == CPS_PAGEIN, cmd == OBD_BRW_READ));
- LASSERT(equi(page->cp_state == CPS_PAGEOUT, cmd == OBD_BRW_WRITE));
+ LASSERTF(equi(page->cp_state == CPS_PAGEIN, cmd == OBD_BRW_READ),
+ "cp_state:%u, cmd:%d\n", page->cp_state, cmd);
+ LASSERTF(equi(page->cp_state == CPS_PAGEOUT, cmd == OBD_BRW_WRITE),
+ "cp_state:%u, cmd:%d\n", page->cp_state, cmd);
LASSERT(opg->ops_transfer_pinned);
/*
#define OSC_DUMP_GRANT(lvl, cli, fmt, args...) do { \
struct client_obd *__tmp = (cli); \
CDEBUG(lvl, "%s: grant { dirty: %ld/%ld dirty_pages: %d/%d " \
- "dropped: %ld avail: %ld, reserved: %ld, flight: %d } " \
- "lru {in list: %d, left: %d, waiters: %d }" fmt, \
+ "unstable_pages: %d/%d dropped: %ld avail: %ld, " \
+ "reserved: %ld, flight: %d } lru {in list: %d, " \
+ "left: %d, waiters: %d }" fmt, \
__tmp->cl_import->imp_obd->obd_name, \
__tmp->cl_dirty, __tmp->cl_dirty_max, \
cfs_atomic_read(&obd_dirty_pages), obd_max_dirty_pages, \
+ cfs_atomic_read(&obd_unstable_pages), obd_max_dirty_pages, \
__tmp->cl_lost_grant, __tmp->cl_avail_grant, \
__tmp->cl_reserved_grant, __tmp->cl_w_in_flight, \
cfs_atomic_read(&__tmp->cl_lru_in_list), \
return 0;
if (cli->cl_dirty + PAGE_CACHE_SIZE <= cli->cl_dirty_max &&
- cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages) {
+ cfs_atomic_read(&obd_unstable_pages) + 1 +
+ cfs_atomic_read(&obd_dirty_pages) <= obd_max_dirty_pages) {
osc_consume_write_grant(cli, &oap->oap_brw_page);
if (transient) {
cli->cl_dirty_transit += PAGE_CACHE_SIZE;
* RPC size will be.
* The exiting condition is no avail grants and no dirty pages caching,
* that really means there is no space on the OST. */
- cfs_waitq_init(&ocw.ocw_waitq);
+ init_waitqueue_head(&ocw.ocw_waitq);
ocw.ocw_oap = oap;
ocw.ocw_grant = bytes;
while (cli->cl_dirty > 0 || cli->cl_w_in_flight > 0) {
/* l_wait_event is interrupted by signal, or timed out */
if (rc < 0) {
- if (rc == -ETIMEDOUT) {
+ switch (rc) {
+ case -ETIMEDOUT:
OSC_DUMP_GRANT(D_ERROR, cli,
"try to reserve %d.\n", bytes);
osc_extent_tree_dump(D_ERROR, osc);
rc = -EDQUOT;
+ break;
+ case -EINTR:
+ /* Ensures restartability - LU-3581 */
+ rc = -ERESTARTSYS;
+ break;
+ default:
+ CDEBUG(D_CACHE, "%s: event for cache space @"
+ " %p never arrived due to %d\n",
+ cli->cl_import->imp_obd->obd_name,
+ &ocw, rc);
+ break;
}
-
cfs_list_del_init(&ocw.ocw_entry);
GOTO(out, rc);
}
ocw->ocw_rc = -EDQUOT;
/* we can't dirty more */
if ((cli->cl_dirty + PAGE_CACHE_SIZE > cli->cl_dirty_max) ||
- (cfs_atomic_read(&obd_dirty_pages) + 1 >
- obd_max_dirty_pages)) {
+ (cfs_atomic_read(&obd_unstable_pages) + 1 +
+ cfs_atomic_read(&obd_dirty_pages) > obd_max_dirty_pages)) {
CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
"osc max %ld, sys max %d\n", cli->cl_dirty,
cli->cl_dirty_max, obd_max_dirty_pages);
CDEBUG(D_CACHE, "wake up %p for oap %p, avail grant %ld, %d\n",
ocw, ocw->ocw_oap, cli->cl_avail_grant, ocw->ocw_rc);
- cfs_waitq_signal(&ocw->ocw_waitq);
+ wake_up(&ocw->ocw_waitq);
}
EXIT;
ar->ar_force_sync = 0;
}
+/* Performs "unstable" page accounting. This function balances the
+ * increment operations performed in osc_inc_unstable_pages. It is
+ * registered as the RPC request callback, and is executed when the
+ * bulk RPC is committed on the server. Thus at this point, the pages
+ * involved in the bulk transfer are no longer considered unstable. */
+void osc_dec_unstable_pages(struct ptlrpc_request *req)
+{
+ struct ptlrpc_bulk_desc *desc = req->rq_bulk;
+ struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+ obd_count page_count = desc->bd_iov_count;
+ int i;
+
+ /* No unstable page tracking */
+ if (cli->cl_cache == NULL)
+ return;
+
+ LASSERT(page_count >= 0);
+
+ for (i = 0; i < page_count; i++)
+ dec_zone_page_state(desc->bd_iov[i].kiov_page, NR_UNSTABLE_NFS);
+
+ cfs_atomic_sub(page_count, &cli->cl_cache->ccc_unstable_nr);
+ LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_unstable_nr) >= 0);
+
+ cfs_atomic_sub(page_count, &obd_unstable_pages);
+ LASSERT(cfs_atomic_read(&obd_unstable_pages) >= 0);
+
+ spin_lock(&req->rq_lock);
+ req->rq_committed = 1;
+ req->rq_unstable = 0;
+ spin_unlock(&req->rq_lock);
+
+ wake_up_all(&cli->cl_cache->ccc_unstable_waitq);
+}
+
+/* "unstable" page accounting. See: osc_dec_unstable_pages. */
+void osc_inc_unstable_pages(struct ptlrpc_request *req)
+{
+ struct ptlrpc_bulk_desc *desc = req->rq_bulk;
+ struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
+ obd_count page_count = desc->bd_iov_count;
+ int i;
+
+ /* No unstable page tracking */
+ if (cli->cl_cache == NULL)
+ return;
+
+ LASSERT(page_count >= 0);
+
+ for (i = 0; i < page_count; i++)
+ inc_zone_page_state(desc->bd_iov[i].kiov_page, NR_UNSTABLE_NFS);
+
+ LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_unstable_nr) >= 0);
+ cfs_atomic_add(page_count, &cli->cl_cache->ccc_unstable_nr);
+
+ LASSERT(cfs_atomic_read(&obd_unstable_pages) >= 0);
+ cfs_atomic_add(page_count, &obd_unstable_pages);
+
+ spin_lock(&req->rq_lock);
+
+ /* If the request has already been committed (i.e. brw_commit
+ * called via rq_commit_cb), we need to undo the unstable page
+ * increments we just performed because rq_commit_cb wont be
+ * called again. Otherwise, just set the commit callback so the
+ * unstable page accounting is properly updated when the request
+ * is committed */
+ if (req->rq_committed) {
+ /* Drop lock before calling osc_dec_unstable_pages */
+ spin_unlock(&req->rq_lock);
+ osc_dec_unstable_pages(req);
+ spin_lock(&req->rq_lock);
+ } else {
+ req->rq_unstable = 1;
+ req->rq_commit_cb = osc_dec_unstable_pages;
+ }
+
+ spin_unlock(&req->rq_lock);
+}
+
/* this must be called holding the loi list lock to give coverage to exit_cache,
* async_flag maintenance, and oap_request */
static void osc_ap_completion(const struct lu_env *env, struct client_obd *cli,
ENTRY;
if (oap->oap_request != NULL) {
+ if (rc == 0)
+ osc_inc_unstable_pages(oap->oap_request);
+
xid = ptlrpc_req_xid(oap->oap_request);
ptlrpc_req_finished(oap->oap_request);
oap->oap_request = NULL;
RETURN(0);
cfs_list_for_each_entry(tmp, rpclist, oe_link) {
- EASSERT(tmp->oe_owner == cfs_current(), tmp);
+ EASSERT(tmp->oe_owner == current, tmp);
#if 0
if (overlapped(tmp, ext)) {
OSC_EXTENT_DUMP(D_ERROR, tmp, "overlapped %p.\n", ext);
*pc += ext->oe_nr_pages;
cfs_list_move_tail(&ext->oe_link, rpclist);
- ext->oe_owner = cfs_current();
+ ext->oe_owner = current;
RETURN(1);
}
cl_object_get(obj);
client_obd_list_unlock(&cli->cl_loi_list_lock);
lu_object_ref_add_at(&obj->co_lu, &link, "check",
- cfs_current());
+ current);
/* attempt some read/write balancing by alternating between
* reads and writes in an object. The makes_rpc checks here
osc_list_maint(cli, osc);
lu_object_ref_del_at(&obj->co_lu, &link, "check",
- cfs_current());
+ current);
cl_object_put(env, obj);
client_obd_list_lock(&cli->cl_loi_list_lock);
oap->oap_cmd = cmd;
oap->oap_page_off = ops->ops_from;
oap->oap_count = ops->ops_to - ops->ops_from;
+ /* No need to hold a lock here,
+ * since this page is not in any list yet. */
oap->oap_async_flags = 0;
oap->oap_brw_flags = brw_flags;
EASSERT(ext->oe_start >= start &&
ext->oe_max_end <= end, ext);
osc_extent_state_set(ext, OES_LOCKING);
- ext->oe_owner = cfs_current();
+ ext->oe_owner = current;
cfs_list_move_tail(&ext->oe_link,
&discard_list);
osc_update_pending(obj, OBD_BRW_WRITE,