return rc;
}
-static void osc_announce_cached(struct client_obd *cli, struct ost_body *body)
+static void osc_announce_cached(struct client_obd *cli, struct obdo *oa)
{
- obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLRDEV;
+ obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
- LASSERT(!(body->oa.o_valid & bits));
+ LASSERT(!(oa->o_valid & bits));
- body->oa.o_valid |= bits;
- down(&cli->cl_dirty_sem);
- body->oa.o_blocks = cli->cl_dirty;
- body->oa.o_rdev = cli->cl_dirty_granted;
- up(&cli->cl_dirty_sem);
+ oa->o_valid |= bits;
+ spin_lock(&cli->cl_loi_list_lock);
+ oa->o_blocks = cli->cl_dirty;
+ oa->o_grant = cli->cl_dirty_granted;
+ spin_unlock(&cli->cl_loi_list_lock);
CDEBUG(D_INODE, "announcing "LPU64" dirty "LPU64" granted\n",
cli->cl_dirty, cli->cl_dirty_granted);
}
static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
{
- if(!(body->oa.o_valid & OBD_MD_FLRDEV)) {
+ if(!(body->oa.o_valid & OBD_MD_FLGRANT)) {
if (cli->cl_ost_can_grant) {
CDEBUG(D_INODE, "%s can't grant\n",
cli->cl_import->imp_target_uuid.uuid);
return;
}
- CDEBUG(D_ERROR, "got "LPU64" grant\n", body->oa.o_rdev);
- down(&cli->cl_dirty_sem);
- cli->cl_dirty_granted = body->oa.o_rdev;
- /* XXX check for over-run and wake up the io thread that
- * doesn't exist yet */
- up(&cli->cl_dirty_sem);
+ CDEBUG(D_SUPER, "got "LPU64" grant\n", body->oa.o_grant);
+ spin_lock(&cli->cl_loi_list_lock);
+ if (cli->cl_dirty_granted != body->oa.o_grant) {
+ cli->cl_dirty_granted = body->oa.o_grant;
+ osc_adjust_cache(cli);
+ }
+ spin_unlock(&cli->cl_loi_list_lock);
}
/* We assume that the reason this OSC got a short read is because it read
LASSERT((void *)(niobuf - niocount) ==
lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf)));
- osc_announce_cached(cli, body);
+ osc_announce_cached(cli, &body->oa);
spin_lock_irqsave(&req->rq_lock, flags);
req->rq_no_resend = 1;
spin_unlock_irqrestore(&req->rq_lock, flags);
struct osc_cache_waiter {
struct list_head ocw_entry;
wait_queue_head_t ocw_waitq;
+ int ocw_rc;
};
static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
{
spin_unlock(&cli->cl_loi_list_lock);
RETURN(rc);
};
+
+static inline obd_size osc_cache_cap(struct client_obd *cli)
+{
+ if (cli->cl_ost_can_grant)
+ return min(cli->cl_dirty_granted, cli->cl_dirty_max);
+
+ return cli->cl_dirty_max;
+}
+void osc_adjust_cache(struct client_obd *cli)
+{
+ struct list_head *l, *tmp;
+ struct osc_cache_waiter *ocw;
+ obd_size cache_cap = osc_cache_cap(cli);
+
+ ENTRY;
+
+ list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
+ if (cli->cl_dirty + PAGE_SIZE > cache_cap &&
+ cache_cap >= PAGE_SIZE)
+ break;
+
+ ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
+ list_del_init(&ocw->ocw_entry);
+ if (cache_cap < PAGE_SIZE) {
+ /* "They" said we are starting synchronous operations, so
+ wakeup everybody waiting for pages in cache and make them
+ go away unsatisfied. */
+ ocw->ocw_rc = -EDQUOT;
+ } else {
+ cli->cl_dirty += PAGE_SIZE;
+ }
+ wake_up(&ocw->ocw_waitq);
+ }
+
+ EXIT;
+}
static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
struct osc_async_page *oap)
{
int rc = 0;
ENTRY;
- /* XXX check for ost grants here as well.. for now we ignore them. */
- if (cli->cl_dirty_max < PAGE_SIZE)
+ if (osc_cache_cap(cli) < PAGE_SIZE)
RETURN(-EDQUOT);
/* if we fail this test then cl_dirty contains at least one page
* that will have to be completed after we release the lock */
- if (cli->cl_dirty + PAGE_SIZE <= cli->cl_dirty_max) {
+ if (cli->cl_dirty + PAGE_SIZE <= osc_cache_cap(cli)) {
/* account for ourselves */
cli->cl_dirty += PAGE_SIZE;
GOTO(out, rc = 0);
}
init_waitqueue_head(&ocw.ocw_waitq);
+ ocw.ocw_rc = 0;
list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
/* make sure that there are write rpcs in flight to wait for. this
l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
spin_lock(&cli->cl_loi_list_lock);
+ rc = ocw.ocw_rc;
if (!list_empty(&ocw.ocw_entry)) {
rc = -EINTR;
list_del(&ocw.ocw_entry);
return;
}
- if (list_empty(&cli->cl_cache_waiters)) {
+ /* If nobody waits for cache space or if we need to shrink it */
+ if (list_empty(&cli->cl_cache_waiters) ||
+ (cli->cl_dirty > osc_cache_cap(cli))) {
cli->cl_dirty -= PAGE_SIZE;
} else {
ocw = list_entry(cli->cl_cache_waiters.next,
static int osc_invalidate_import(struct obd_device *obd,
struct obd_import *imp)
{
+ struct client_obd *cli;
LASSERT(imp->imp_obd == obd);
/* this used to try and tear down queued pages, but it was
* not correctly implemented. We'll have to do it again once
* we call obd_invalidate_import() agian */
- LBUG();
+ /* XXX And we still need to do this */
+
+ /* Reset grants, too */
+ cli = &obd->u.cli;
+ spin_lock(&cli->cl_loi_list_lock);
+ cli->cl_ost_can_grant = cli->cl_dirty_granted = 0;
+ spin_unlock(&cli->cl_loi_list_lock);
+
RETURN(0);
}