Whamcloud - gitweb
r=zab,phil
[fs/lustre-release.git] / lustre / osc / osc_request.c
index 0577cf2..c007a6b 100644 (file)
@@ -532,24 +532,24 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa,
         return rc;
 }
 
-static void osc_announce_cached(struct client_obd *cli, struct ost_body *body)
+static void osc_announce_cached(struct client_obd *cli, struct obdo *oa)
 {
-        obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLRDEV;
+        obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
 
-        LASSERT(!(body->oa.o_valid & bits));
+        LASSERT(!(oa->o_valid & bits));
 
-        body->oa.o_valid |= bits;
-        down(&cli->cl_dirty_sem);
-        body->oa.o_blocks = cli->cl_dirty;
-        body->oa.o_rdev = cli->cl_dirty_granted;
-        up(&cli->cl_dirty_sem);
+        oa->o_valid |= bits;
+        spin_lock(&cli->cl_loi_list_lock);
+        oa->o_blocks = cli->cl_dirty;
+        oa->o_grant = cli->cl_dirty_granted;
+        spin_unlock(&cli->cl_loi_list_lock);
         CDEBUG(D_INODE, "announcing "LPU64" dirty "LPU64" granted\n",
                cli->cl_dirty, cli->cl_dirty_granted);
 }
 
 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
 {
-        if(!(body->oa.o_valid & OBD_MD_FLRDEV)) {
+        if(!(body->oa.o_valid & OBD_MD_FLGRANT)) {
                 if (cli->cl_ost_can_grant) {
                         CDEBUG(D_INODE, "%s can't grant\n",
                                cli->cl_import->imp_target_uuid.uuid);
@@ -558,12 +558,13 @@ static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
                 return;
         }
 
-        CDEBUG(D_ERROR, "got "LPU64" grant\n", body->oa.o_rdev);
-        down(&cli->cl_dirty_sem);
-        cli->cl_dirty_granted = body->oa.o_rdev;
-        /* XXX check for over-run and wake up the io thread that
-         * doesn't exist yet */
-        up(&cli->cl_dirty_sem);
+        CDEBUG(D_SUPER, "got "LPU64" grant\n", body->oa.o_grant);
+        spin_lock(&cli->cl_loi_list_lock);
+        if (cli->cl_dirty_granted != body->oa.o_grant) {
+                cli->cl_dirty_granted = body->oa.o_grant;
+                osc_adjust_cache(cli);
+        }
+        spin_unlock(&cli->cl_loi_list_lock);
 }
 
 /* We assume that the reason this OSC got a short read is because it read
@@ -760,7 +761,7 @@ static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa,
 
         LASSERT((void *)(niobuf - niocount) ==
                 lustre_msg_buf(req->rq_reqmsg, 2, niocount * sizeof(*niobuf)));
-        osc_announce_cached(cli, body);
+        osc_announce_cached(cli, &body->oa);
         spin_lock_irqsave(&req->rq_lock, flags);
         req->rq_no_resend = 1;
         spin_unlock_irqrestore(&req->rq_lock, flags);
@@ -1471,6 +1472,7 @@ static void osc_check_rpcs(struct client_obd *cli)
 struct osc_cache_waiter {
         struct list_head        ocw_entry;
         wait_queue_head_t       ocw_waitq;
+        int                     ocw_rc;
 };
 static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
 {
@@ -1481,6 +1483,42 @@ static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
         spin_unlock(&cli->cl_loi_list_lock);
         RETURN(rc);
 };
+
+static inline obd_size osc_cache_cap(struct client_obd *cli)
+{
+        if (cli->cl_ost_can_grant)
+                return min(cli->cl_dirty_granted, cli->cl_dirty_max);
+
+        return cli->cl_dirty_max;
+}
+void osc_adjust_cache(struct client_obd *cli)
+{
+        struct list_head *l, *tmp;
+        struct osc_cache_waiter *ocw;
+        obd_size cache_cap = osc_cache_cap(cli);
+
+        ENTRY;
+
+        list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
+                if (cli->cl_dirty + PAGE_SIZE > cache_cap &&
+                    cache_cap >= PAGE_SIZE)
+                        break;
+
+                ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
+                list_del_init(&ocw->ocw_entry);
+                if (cache_cap < PAGE_SIZE) {
+                /* "They" said we are starting synchronous operations, so
+                   wakeup everybody waiting for pages in cache and make them
+                   go away unsatisfied. */
+                        ocw->ocw_rc = -EDQUOT;
+                } else {
+                        cli->cl_dirty += PAGE_SIZE;
+                }
+                wake_up(&ocw->ocw_waitq);
+        }
+
+        EXIT;
+}
 static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
                            struct osc_async_page *oap)
 {
@@ -1489,19 +1527,19 @@ static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
         int rc = 0;
         ENTRY;
 
-        /* XXX check for ost grants here as well.. for now we ignore them. */
-        if (cli->cl_dirty_max < PAGE_SIZE)
+        if (osc_cache_cap(cli) < PAGE_SIZE)
                 RETURN(-EDQUOT);
 
         /* if we fail this test then cl_dirty contains at least one page
          * that will have to be completed after we release the lock */
-        if (cli->cl_dirty + PAGE_SIZE <= cli->cl_dirty_max) {
+        if (cli->cl_dirty + PAGE_SIZE <= osc_cache_cap(cli)) {
                 /* account for ourselves */
                 cli->cl_dirty += PAGE_SIZE;
                 GOTO(out, rc = 0);
         }
 
         init_waitqueue_head(&ocw.ocw_waitq);
+        ocw.ocw_rc = 0;
         list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
 
         /* make sure that there are write rpcs in flight to wait for. this
@@ -1515,6 +1553,7 @@ static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
         l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
 
         spin_lock(&cli->cl_loi_list_lock);
+        rc = ocw.ocw_rc;
         if (!list_empty(&ocw.ocw_entry)) {
                 rc = -EINTR;
                 list_del(&ocw.ocw_entry);
@@ -1539,7 +1578,9 @@ static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap)
                 return;
         }
 
-        if (list_empty(&cli->cl_cache_waiters)) {
+        /* If nobody waits for cache space or if we need to shrink it */
+        if (list_empty(&cli->cl_cache_waiters) ||
+            (cli->cl_dirty > osc_cache_cap(cli))) {
                 cli->cl_dirty -= PAGE_SIZE;
         } else {
                 ocw = list_entry(cli->cl_cache_waiters.next,
@@ -2629,11 +2670,19 @@ static int osc_lock_contains(struct obd_export *exp, struct lov_stripe_md *lsm,
 static int osc_invalidate_import(struct obd_device *obd,
                                  struct obd_import *imp)
 {
+        struct client_obd *cli;
         LASSERT(imp->imp_obd == obd);
         /* this used to try and tear down queued pages, but it was
          * not correctly implemented.  We'll have to do it again once
          * we call obd_invalidate_import() agian */
-        LBUG();
+        /* XXX And we still need to do this */
+
+        /* Reset grants, too */
+        cli = &obd->u.cli;
+        spin_lock(&cli->cl_loi_list_lock);
+        cli->cl_ost_can_grant = cli->cl_dirty_granted = 0;
+        spin_unlock(&cli->cl_loi_list_lock);
+        
         RETURN(0);
 }