return rc;
}
-static void osc_announce_cached(struct client_obd *cli, struct ost_body *body)
+static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
+ long writing_bytes)
{
- obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLRDEV;
+ obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
- LASSERT(!(body->oa.o_valid & bits));
+ LASSERT(!(oa->o_valid & bits));
- body->oa.o_valid |= bits;
- down(&cli->cl_dirty_sem);
- body->oa.o_blocks = cli->cl_dirty;
- body->oa.o_rdev = cli->cl_dirty_granted;
- up(&cli->cl_dirty_sem);
- CDEBUG(D_INODE, "announcing "LPU64" dirty "LPU64" granted\n",
- cli->cl_dirty, cli->cl_dirty_granted);
+ oa->o_valid |= bits;
+ spin_lock(&cli->cl_loi_list_lock);
+ oa->o_dirty = cli->cl_dirty;
+ oa->o_undirty = cli->cl_dirty_max - oa->o_dirty;
+ oa->o_grant = cli->cl_avail_grant;
+ oa->o_dropped = cli->cl_lost_grant;
+ cli->cl_lost_grant = 0;
+ spin_unlock(&cli->cl_loi_list_lock);
+ CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
+ oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
}
-static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
+/* caller must hold loi_list_lock */
+static void osc_consume_write_grant(struct client_obd *cli,
+ struct osc_async_page *oap)
+{
+ cli->cl_dirty += PAGE_SIZE;
+ cli->cl_avail_grant -= PAGE_SIZE;
+ oap->oap_brw_flags |= OBD_BRW_FROM_GRANT;
+ CDEBUG(D_CACHE, "using %lu grant credits for oap %p\n", PAGE_SIZE, oap);
+ LASSERT(cli->cl_avail_grant >= 0);
+}
+
+/* caller must hold loi_list_lock */
+void osc_wake_cache_waiters(struct client_obd *cli)
{
- if(!(body->oa.o_valid & OBD_MD_FLRDEV)) {
- if (cli->cl_ost_can_grant) {
- CDEBUG(D_INODE, "%s can't grant\n",
- cli->cl_import->imp_target_uuid.uuid);
+ struct list_head *l, *tmp;
+ struct osc_cache_waiter *ocw;
+
+ list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
+ /* if we can't dirty more, we must wait until some is written */
+ if (cli->cl_dirty + PAGE_SIZE > cli->cl_dirty_max) {
+ CDEBUG(D_CACHE, "no dirty room: dirty: %ld max %ld\n",
+ cli->cl_dirty, cli->cl_dirty_max);
+ return;
}
- cli->cl_ost_can_grant = 0;
- return;
+
+ /* if still dirty cache but no grant wait for pending RPCs that
+ * may yet return us some grant before doing sync writes */
+ if (cli->cl_brw_in_flight && cli->cl_avail_grant < PAGE_SIZE) {
+ CDEBUG(D_CACHE, "%d BRWs in flight, no grant\n",
+ cli->cl_brw_in_flight);
+ return;
+ }
+
+ ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
+ list_del_init(&ocw->ocw_entry);
+ if (cli->cl_avail_grant < PAGE_SIZE) {
+ /* no more RPCs in flight to return grant, do sync IO */
+ ocw->ocw_rc = -EDQUOT;
+ CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
+ } else {
+ osc_consume_write_grant(cli, ocw->ocw_oap);
+ }
+ wake_up(&ocw->ocw_waitq);
}
- CDEBUG(D_ERROR, "got "LPU64" grant\n", body->oa.o_rdev);
- down(&cli->cl_dirty_sem);
- cli->cl_dirty_granted = body->oa.o_rdev;
- /* XXX check for over-run and wake up the io thread that
- * doesn't exist yet */
- up(&cli->cl_dirty_sem);
+ EXIT;
+}
+
+static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
+{
+ spin_lock(&cli->cl_loi_list_lock);
+ CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
+ cli->cl_avail_grant += body->oa.o_grant;
+ /* waiters are woken in brw_interpret_oap */
+ spin_unlock(&cli->cl_loi_list_lock);
}
/* We assume that the reason this OSC got a short read is because it read
static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
{
if (p1->flag != p2->flag) {
- unsigned mask = ~(OBD_BRW_CREATE|OBD_BRW_FROM_GRANT);
+ unsigned mask = ~OBD_BRW_FROM_GRANT;
/* warn if we try to combine flags that we don't know to be
* safe to combine */
opc = ((cmd & OBD_BRW_WRITE) != 0) ? OST_WRITE : OST_READ;
for (niocount = i = 1; i < page_count; i++)
- if (!can_merge_pages (&pga[i - 1], &pga[i]))
+ if (!can_merge_pages(&pga[i - 1], &pga[i]))
niocount++;
size[0] = sizeof(*body);
LASSERT((void *)(niobuf - niocount) ==
lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf)));
- osc_announce_cached(cli, body);
+ osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
spin_lock_irqsave(&req->rq_lock, flags);
req->rq_no_resend = 1;
spin_unlock_irqrestore(&req->rq_lock, flags);
if (opc == OST_WRITE) {
#if CHECKSUM_BULK
body->oa.o_valid |= OBD_MD_FLCKSUM;
- body->oa.o_nlink = cksum_pages(requested_nob, page_count, pga);
+ body->oa.o_cksum = cksum_pages(requested_nob, page_count, pga);
#endif
/* 1 RC per niobuf */
size[1] = sizeof(__u32) * niocount;
{
struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
struct ost_body *body;
+ ENTRY;
if (rc < 0)
- return (rc);
+ RETURN(rc);
body = lustre_swab_repbuf(req, 0, sizeof(*body), lustre_swab_ost_body);
if (body == NULL) {
CERROR ("Can't unpack body\n");
- return (-EPROTO);
+ RETURN(-EPROTO);
}
osc_update_grant(cli, body);
if (req->rq_reqmsg->opc == OST_WRITE) {
if (rc > 0) {
CERROR ("Unexpected +ve rc %d\n", rc);
- return (-EPROTO);
+ RETURN(-EPROTO);
}
- return(check_write_rcs(req, niocount, page_count, pga));
+ RETURN(check_write_rcs(req, niocount, page_count, pga));
}
if (rc > requested_nob) {
CERROR("Unexpected rc %d (%d requested)\n", rc, requested_nob);
- return (-EPROTO);
+ RETURN(-EPROTO);
}
if (rc < requested_nob)
const struct ptlrpc_peer *peer =
&req->rq_import->imp_connection->c_peer;
static int cksum_counter;
- obd_count server_cksum = oa->o_nlink;
+ obd_count server_cksum = oa->o_cksum;
obd_count cksum = cksum_pages(rc, page_count, pga);
char str[PTL_NALFMT_SIZE];
LPX64" (%s)\n", server_cksum, cksum,
peer->peer_nid, str);
cksum_counter = 0;
- oa->o_nlink = cksum;
+ oa->o_cksum = cksum;
} else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
CWARN("Checksum %u from "LPX64" (%s) OK: %x\n",
cksum_counter, peer->peer_nid, str, cksum);
req->rq_import->imp_connection->c_peer.peer_nid);
}
#endif
- return (0);
+ RETURN(0);
}
static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa,
}
static void osc_check_rpcs(struct client_obd *cli);
-static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap);
+static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
+ int sent);
static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi);
static void lop_update_pending(struct client_obd *cli,
struct loi_oap_pages *lop, int cmd, int delta);
list_del_init(&oap->oap_urgent_item);
loi = oap->oap_loi;
- lop = (oap->oap_cmd == OBD_BRW_WRITE) ?
+ lop = (oap->oap_cmd == OBD_BRW_WRITE) ?
&loi->loi_write_lop : &loi->loi_read_lop;
lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
loi_list_maint(oap->oap_cli, oap->oap_loi);
osic_complete_one(oap->oap_osic, &oap->oap_occ, 0);
oap->oap_osic = NULL;
-
}
unlock:
spin_unlock(&oap->oap_cli->cl_loi_list_lock);
}
-/* this must be called holding the list lock to give coverage to exit_cache,
+/* this must be called holding the loi list lock to give coverage to exit_cache,
* async_flag maintenance, and oap_request */
static void osc_complete_oap(struct client_obd *cli,
- struct osc_async_page *oap, int rc)
+ struct osc_async_page *oap, int sent, int rc)
{
- ENTRY;
- osc_exit_cache(cli, oap);
+ osc_exit_cache(cli, oap, sent);
oap->oap_async_flags = 0;
oap->oap_interrupted = 0;
oap->oap_caller_ops->ap_completion(oap->oap_caller_data, oap->oap_cmd,
rc);
- EXIT;
}
static int brw_interpret_oap(struct ptlrpc_request *request,
spin_lock(&cli->cl_loi_list_lock);
+ /* We need to decrement before osc_complete_oap->osc_wake_cache_waiters
+ * is called so we know whether to go to sync BRWs or wait for more
+ * RPCs to complete */
+ cli->cl_brw_in_flight--;
+
/* the caller may re-use the oap after the completion call so
* we need to clean it up a little */
list_for_each_safe(pos, n, &aa->aa_oaps) {
//oap->oap_page, oap->oap_page->index, oap);
list_del_init(&oap->oap_rpc_item);
- osc_complete_oap(cli, oap, rc);
+ osc_complete_oap(cli, oap, 1, rc);
}
- cli->cl_brw_in_flight--;
+ osc_wake_cache_waiters(cli);
osc_check_rpcs(cli);
spin_unlock(&cli->cl_loi_list_lock);
pga[i].pg = oap->oap_page;
pga[i].count = oap->oap_count;
pga[i].flag = oap->oap_brw_flags;
- //CDEBUG(D_INODE, "putting page %p index %lu oap %p into pga\n",
- //pga[i].pg, oap->oap_page->index, oap);
+ CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
+ pga[i].pg, oap->oap_page->index, oap, pga[i].flag);
i++;
}
int rc = ops->ap_make_ready(oap->oap_caller_data, cmd);
if (rc < 0)
CDEBUG(D_INODE, "oap %p page %p returned %d "
- "instead of ready\n", oap,
+ "instead of ready\n", oap,
oap->oap_page, rc);
switch (rc) {
case -EAGAIN:
/* llite is telling us that the page is still
* in commit_write and that we should try
- * and put it in an rpc again later. we
+ * and put it in an rpc again later. we
* break out of the loop so we don't create
- * a hole in the sequence of pages in the rpc
+ * a hole in the sequence of pages in the rpc
* stream.*/
pos = NULL;
break;
break;
default:
LASSERTF(0, "oap %p page %p returned %d "
- "from make_ready\n", oap,
+ "from make_ready\n", oap,
oap->oap_page, rc);
break;
}
/* ask the caller for the size of the io as the rpc leaves. */
if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE))
- oap->oap_count = ops->ap_refresh_count(
- oap->oap_caller_data,
- cmd);
+ oap->oap_count =
+ ops->ap_refresh_count(oap->oap_caller_data,cmd);
if (oap->oap_count <= 0) {
- CDEBUG(D_INODE, "oap %p count %d, completing\n", oap,
+ CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
oap->oap_count);
- osc_complete_oap(cli, oap, oap->oap_count);
+ osc_complete_oap(cli, oap, 0, oap->oap_count);
continue;
}
break;
}
+ osc_wake_cache_waiters(cli);
+
if (page_count == 0)
RETURN(0);
* were between the pending list and the rpc */
if (oap->oap_interrupted) {
CDEBUG(D_INODE, "oap %p interrupted\n", oap);
- osc_complete_oap(cli, oap, oap->oap_count);
+ osc_complete_oap(cli, oap, 0, oap->oap_count);
continue;
}
lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_brw_in_flight);
} else {
lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
- lprocfs_oh_tally(&cli->cl_write_rpc_hist,
+ lprocfs_oh_tally(&cli->cl_write_rpc_hist,
cli->cl_brw_in_flight);
}
list_for_each(pos, &aa->aa_oaps) {
oap = list_entry(pos, struct osc_async_page, oap_rpc_item);
if (oap->oap_interrupted) {
- CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
+ CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
oap, request);
ptlrpc_mark_interrupted(request);
break;
* that are being queued but which can't be made ready until
* the queuer finishes with the page. this is a wart for
* llite::commit_write() */
- optimal *= 2;
+ optimal += 16;
}
if (lop->lop_num_pending >= optimal)
RETURN(1);
RETURN(0);
}
-static void on_list(struct list_head *item, struct list_head *list,
+static void on_list(struct list_head *item, struct list_head *list,
int should_be_on)
{
if (list_empty(item) && should_be_on)
* can find pages to build into rpcs quickly */
static void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
{
- on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
+ on_list(&loi->loi_cli_item, &cli->cl_loi_ready_list,
lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE) ||
lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
- on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
+ on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
loi->loi_write_lop.lop_num_pending);
}
-#define LOI_DEBUG(LOI, STR, args...) \
- CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
- !list_empty(&(LOI)->loi_cli_item), \
+#define LOI_DEBUG(LOI, STR, args...) \
+ CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
+ !list_empty(&(LOI)->loi_cli_item), \
(LOI)->loi_write_lop.lop_num_pending, \
- !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
+ !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
(LOI)->loi_read_lop.lop_num_pending, \
- !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
- args) \
+ !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
+ args) \
struct lov_oinfo *osc_next_loi(struct client_obd *cli)
{
ENTRY;
/* first return all objects which we already know to have
- * pages ready to be stuffed into rpcs */
+ * pages ready to be stuffed into rpcs */
if (!list_empty(&cli->cl_loi_ready_list))
- RETURN(list_entry(cli->cl_loi_ready_list.next,
+ RETURN(list_entry(cli->cl_loi_ready_list.next,
struct lov_oinfo, loi_cli_item));
-
- /* then if we have cache waiters, return all objects with queued
+
+ /* then if we have cache waiters, return all objects with queued
* writes. This is especially important when many small files
* have filled up the cache and not been fired into rpcs because
* they don't pass the nr_pending/object threshhold */
if (!list_empty(&cli->cl_cache_waiters) &&
!list_empty(&cli->cl_loi_write_list))
- RETURN(list_entry(cli->cl_loi_write_list.next,
+ RETURN(list_entry(cli->cl_loi_write_list.next,
struct lov_oinfo, loi_write_item));
RETURN(NULL);
}
/* we're trying to queue a page in the osc so we're subject to the
* 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
* If the osc's queued pages are already at that limit, then we want to sleep
- * until there is space in the osc's queue for us. we need this goofy
- * little struct to really tell that our allocation was fulfilled in
- * the presence of pending signals */
-struct osc_cache_waiter {
- struct list_head ocw_entry;
- wait_queue_head_t ocw_waitq;
-};
+ * until there is space in the osc's queue for us. We also may be waiting for
+ * write credits from the OST if there are RPCs in flight that may return some
+ * before we fall back to sync writes.
+ *
+ * We need this know our allocation was granted in the presence of signals */
static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
{
int rc;
ENTRY;
spin_lock(&cli->cl_loi_list_lock);
- rc = list_empty(&ocw->ocw_entry);
+ rc = list_empty(&ocw->ocw_entry) || cli->cl_brw_in_flight == 0;
spin_unlock(&cli->cl_loi_list_lock);
RETURN(rc);
};
+
+/* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
+ * grant or cache space. */
static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
struct osc_async_page *oap)
{
struct osc_cache_waiter ocw;
- struct l_wait_info lwi = {0};
- int rc = 0;
- ENTRY;
+ struct l_wait_info lwi = { 0 };
+
+ CDEBUG(D_CACHE, "dirty: %ld dirty_max: %ld dropped: %lu grant: %lu\n",
+ cli->cl_dirty, cli->cl_dirty_max, cli->cl_lost_grant,
+ cli->cl_avail_grant);
- /* XXX check for ost grants here as well.. for now we ignore them. */
if (cli->cl_dirty_max < PAGE_SIZE)
- RETURN(-EDQUOT);
+ return(-EDQUOT);
- /* if we fail this test then cl_dirty contains at least one page
- * that will have to be completed after we release the lock */
- if (cli->cl_dirty + PAGE_SIZE <= cli->cl_dirty_max) {
+
+ /* Hopefully normal case - cache space and write credits available */
+ if (cli->cl_dirty + PAGE_SIZE <= cli->cl_dirty_max &&
+ cli->cl_avail_grant >= PAGE_SIZE) {
/* account for ourselves */
- cli->cl_dirty += PAGE_SIZE;
- GOTO(out, rc = 0);
+ osc_consume_write_grant(cli, oap);
+ return(0);
}
- init_waitqueue_head(&ocw.ocw_waitq);
- list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
+ /* Make sure that there are write rpcs in flight to wait for. This
+ * is a little silly as this object may not have any pending but
+ * other objects sure might. */
+ if (cli->cl_brw_in_flight) {
+ list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
+ init_waitqueue_head(&ocw.ocw_waitq);
+ ocw.ocw_oap = oap;
+ ocw.ocw_rc = 0;
- /* make sure that there are write rpcs in flight to wait for. this
- * is a little silly as this object may not have any pending
- * but other objects sure might. this should probably be cleaned. */
- loi_list_maint(cli, loi);
- osc_check_rpcs(cli);
- spin_unlock(&cli->cl_loi_list_lock);
+ loi_list_maint(cli, loi);
+ osc_check_rpcs(cli);
+ spin_unlock(&cli->cl_loi_list_lock);
- CDEBUG(D_INODE, "sleeping for cache space\n");
- l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
+ CDEBUG(0, "sleeping for cache space\n");
+ l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
- spin_lock(&cli->cl_loi_list_lock);
- if (!list_empty(&ocw.ocw_entry)) {
- rc = -EINTR;
- list_del(&ocw.ocw_entry);
+ spin_lock(&cli->cl_loi_list_lock);
+ if (!list_empty(&ocw.ocw_entry)) {
+ list_del(&ocw.ocw_entry);
+ RETURN(-EINTR);
+ }
+ RETURN(ocw.ocw_rc);
}
- GOTO(out, rc);
-out:
- if (rc == 0)
- oap->oap_brw_flags |= OBD_BRW_FROM_GRANT;
- return rc;
+
+ RETURN(-EDQUOT);
}
-/* the companion to enter_cache, called when an oap is now longer part of the
+/* the companion to enter_cache, called when an oap is no longer part of the
* dirty accounting.. so writeback completes or truncate happens before writing
* starts. must be called with the loi lock held. */
-static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap)
+static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
+ int sent)
{
- struct osc_cache_waiter *ocw;
ENTRY;
if (!(oap->oap_brw_flags & OBD_BRW_FROM_GRANT)) {
return;
}
- if (list_empty(&cli->cl_cache_waiters)) {
- cli->cl_dirty -= PAGE_SIZE;
- } else {
- ocw = list_entry(cli->cl_cache_waiters.next,
- struct osc_cache_waiter, ocw_entry);
- list_del_init(&ocw->ocw_entry);
- wake_up(&ocw->ocw_waitq);
+ oap->oap_brw_flags &= ~OBD_BRW_FROM_GRANT;
+ cli->cl_dirty -= PAGE_SIZE;
+ if (!sent) {
+ cli->cl_lost_grant += PAGE_SIZE;
+ CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
+ cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
}
- oap->oap_brw_flags &= ~OBD_BRW_FROM_GRANT;
EXIT;
}
if (!list_empty(&oap->oap_rpc_item))
GOTO(out, rc = -EBUSY);
- osc_exit_cache(cli, oap);
+ osc_exit_cache(cli, oap, 0);
+ osc_wake_cache_waiters(cli);
if (!list_empty(&oap->oap_urgent_item)) {
list_del_init(&oap->oap_urgent_item);
if (obd->u.cli.cl_conn_count == 1) {
/* flush any remaining cancel messages out to the target */
llog_sync(ctxt, exp);
-
+
/* balance the conn2export for oscc in osc_connect */
class_export_put(exp);
}
static int osc_invalidate_import(struct obd_device *obd,
struct obd_import *imp)
{
+ struct client_obd *cli;
LASSERT(imp->imp_obd == obd);
/* this used to try and tear down queued pages, but it was
* not correctly implemented. We'll have to do it again once
* we call obd_invalidate_import() agian */
- LBUG();
+ /* XXX And we still need to do this */
+
+ /* Reset grants, too */
+ cli = &obd->u.cli;
+ spin_lock(&cli->cl_loi_list_lock);
+ cli->cl_avail_grant = 0;
+ cli->cl_lost_grant = 0;
+ spin_unlock(&cli->cl_loi_list_lock);
+
RETURN(0);
}
int osc_setup(struct obd_device *obd, obd_count len, void *buf)
{
int rc;
-
+
rc = ptlrpcd_addref();
if (rc)
return rc;