Whamcloud - gitweb
b=21571 stacksize and locking fixes for loadgen patch from umka
[fs/lustre-release.git] / lustre / obdfilter / filter_io.c
index ea2721e..368842e 100644 (file)
@@ -113,10 +113,10 @@ void filter_grant_incoming(struct obd_export *exp, struct obdo *oa)
                 obd_size left_space = filter_grant_space_left(exp);
                 struct filter_obd *filter = &exp->exp_obd->u.filter;
 
-                /*Only if left_space < fo_tot_clients * 32M, 
+                /*Only if left_space < fo_tot_clients * 32M,
                  *then the grant space could be shrinked */
-                if (left_space < filter->fo_tot_granted_clients * 
-                                 FILTER_GRANT_SHRINK_LIMIT) { 
+                if (left_space < filter->fo_tot_granted_clients *
+                                 FILTER_GRANT_SHRINK_LIMIT) {
                         fed->fed_grant -= oa->o_grant;
                         filter->fo_tot_granted -= oa->o_grant;
                         CDEBUG(D_CACHE, "%s: cli %s/%p shrink "LPU64
@@ -198,9 +198,13 @@ restat:
 /* Calculate how much grant space to allocate to this client, based on how
  * much space is currently free and how much of that is already granted.
  *
+ * if @conservative != 0, we limit the maximum grant to FILTER_GRANT_CHUNK;
+ * otherwise we'll satisfy the requested amount as possible as we can, this
+ * is usually due to client reconnect.
+ *
  * Caller must hold obd_osfs_lock. */
 long filter_grant(struct obd_export *exp, obd_size current_grant,
-                  obd_size want, obd_size fs_space_left)
+                  obd_size want, obd_size fs_space_left, int conservative)
 {
         struct obd_device *obd = exp->exp_obd;
         struct filter_export_data *fed = &exp->exp_filter_data;
@@ -223,16 +227,11 @@ long filter_grant(struct obd_export *exp, obd_size current_grant,
                        obd->obd_name, exp->exp_client_uuid.uuid, exp, want);
         } else if (current_grant < want &&
                    current_grant < fed->fed_grant + FILTER_GRANT_CHUNK) {
-                grant = min((want >> blockbits),
-                            (fs_space_left >> blockbits) / 8);
-                grant <<= blockbits;
+                grant = min(want + (1 << blockbits) - 1, fs_space_left / 8);
+                grant &= ~((1ULL << blockbits) - 1);
 
                 if (grant) {
-                        /* Allow >FILTER_GRANT_CHUNK size when clients
-                         * reconnect due to a server reboot.
-                         */
-                        if ((grant > FILTER_GRANT_CHUNK) &&
-                            (!obd->obd_recovering))
+                        if (grant > FILTER_GRANT_CHUNK && conservative)
                                 grant = FILTER_GRANT_CHUNK;
 
                         obd->u.filter.fo_tot_granted += grant;
@@ -272,7 +271,7 @@ long filter_grant(struct obd_export *exp, obd_size current_grant,
  * generate more memory pressure, but at the same time use __GFP_NOMEMALLOC
  * in order not to exhaust emergency reserves.
  *
- * See Bug 19529 and Bug 19917 for details. 
+ * See Bug 19529 and Bug 19917 for details.
  */
 static struct page *filter_get_page(struct obd_device *obd,
                                     struct inode *inode,
@@ -282,8 +281,8 @@ static struct page *filter_get_page(struct obd_device *obd,
         struct page *page;
 
         page = find_or_create_page(inode->i_mapping, offset >> CFS_PAGE_SHIFT,
-                                   (localreq ? (GFP_NOFS | __GFP_HIGHMEM) 
-                                             : (GFP_HIGHUSER | __GFP_NOMEMALLOC)));
+                                   (localreq ? (GFP_NOFS | __GFP_HIGHMEM)
+                                             : GFP_HIGHUSER));
         if (unlikely(page == NULL))
                 lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_NO_PAGE, 1);
 
@@ -341,56 +340,30 @@ static int filter_map_remote_to_local(int objcount, struct obd_ioobj *obj,
 }
 
 /*
- * the function is used to free all pages used for request
- * just to mimic cacheless OSS which don't occupy much memory
- */
-void filter_invalidate_cache(struct obd_device *obd, struct obd_ioobj *obj,
-                             struct niobuf_remote *nb, struct inode *inode)
-{
-        struct niobuf_remote *rnb;
-        int i;
-
-        LASSERT(inode != NULL);
-
-        for (i = 0, rnb = nb; i < obj->ioo_bufcnt; i++, rnb++) {
-                obd_off start;
-                obd_off end;
-
-                start = rnb->offset >> CFS_PAGE_SHIFT;
-                end = (rnb->offset + rnb->len) >> CFS_PAGE_SHIFT;
-                invalidate_mapping_pages(inode->i_mapping, start, end);
-                /* just to avoid warnings */
-                start = 0;
-                end = 0;
-        }
-}
-
-/*
  * the invalidate above doesn't work during read because lnet pins pages.
  * The truncate is used here instead to drop pages from cache
  */
-void filter_truncate_cache(struct obd_device *obd, struct obd_ioobj *obj,
-                           struct niobuf_remote *nb, int pages,
-                           struct niobuf_local *res, struct inode *inode)
+void filter_release_cache(struct obd_device *obd, struct obd_ioobj *obj,
+                          struct niobuf_remote *rnb, struct inode *inode)
 {
-        struct niobuf_remote *rnb;
         int i;
 
         LASSERT(inode != NULL);
+        for (i = 0; i < obj->ioo_bufcnt; i++, rnb++) {
 #ifdef HAVE_TRUNCATE_RANGE
-        for (i = 0, rnb = nb; i < obj->ioo_bufcnt; i++, rnb++) {
                 /* remove pages in which range is fit */
                 truncate_inode_pages_range(inode->i_mapping,
                                            rnb->offset & CFS_PAGE_MASK,
                                            (rnb->offset + rnb->len - 1) |
                                            ~CFS_PAGE_MASK);
-        }
-#elif (defined HAVE_TRUNCATE_COMPLETE)
-        for (i = 0, lnb = res; i < pages; i++, lnb++)
-                truncate_complete_page(inode->i_mapping, lnb->page);
 #else
-#error "Nor truncate_inode_pages_range or truncate_complete_page are supported"
+                /* use invalidate for old kernels */
+                invalidate_mapping_pages(inode->i_mapping,
+                                         rnb->offset >> CFS_PAGE_SHIFT,
+                                         (rnb->offset + rnb->len) >>
+                                         CFS_PAGE_SHIFT);
 #endif
+        }
 }
 
 static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
@@ -410,6 +383,7 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
         int rc = 0, i, tot_bytes = 0;
         unsigned long now = jiffies;
         long timediff;
+        loff_t isize;
         ENTRY;
 
         /* We are currently not supporting multi-obj BRW_READ RPCS at all.
@@ -418,7 +392,8 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
         LASSERTF(objcount == 1, "%d\n", objcount);
         LASSERTF(obj->ioo_bufcnt > 0, "%d\n", obj->ioo_bufcnt);
 
-        rc = filter_auth_capa(exp, NULL, obdo_mdsno(oa), capa,
+        LASSERT(oa->o_valid & OBD_MD_FLGROUP);
+        rc = filter_auth_capa(exp, NULL, oa->o_gr, capa,
                               CAPA_OPC_OSS_READ);
         if (rc)
                 RETURN(rc);
@@ -445,6 +420,7 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
         }
 
         inode = dentry->d_inode;
+        isize = i_size_read(inode);
 
         obdo_to_inode(inode, oa, OBD_MD_FLATIME);
 
@@ -460,7 +436,7 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
 
                 lnb->dentry = dentry;
 
-                if (i_size_read(inode) <= lnb->offset)
+                if (isize <= lnb->offset)
                         /* If there's no more data, abort early.  lnb->rc == 0,
                          * so it's easy to detect later. */
                         break;
@@ -471,8 +447,8 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
 
                 lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_CACHE_ACCESS, 1);
 
-                if (i_size_read(inode) < lnb->offset + lnb->len - 1)
-                        lnb->rc = i_size_read(inode) - lnb->offset;
+                if (isize < lnb->offset + lnb->len - 1)
+                        lnb->rc = isize - lnb->offset;
                 else
                         lnb->rc = lnb->len;
 
@@ -684,12 +660,14 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
         LASSERT(objcount == 1);
         LASSERT(obj->ioo_bufcnt > 0);
 
-        rc = filter_auth_capa(exp, NULL, obdo_mdsno(oa), capa,
+        LASSERT(oa->o_valid & OBD_MD_FLGROUP);
+        rc = filter_auth_capa(exp, NULL, oa->o_gr, capa,
                               CAPA_OPC_OSS_WRITE);
         if (rc)
                 RETURN(rc);
 
-        if (exp->exp_connection->c_peer.nid == exp->exp_connection->c_self) 
+        if (exp->exp_connection &&
+            exp->exp_connection->c_peer.nid == exp->exp_connection->c_self)
                 localreq = 1;
 
         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
@@ -712,7 +690,7 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
 
         if (oa->o_valid & (OBD_MD_FLUID | OBD_MD_FLGID) &&
             dentry->d_inode->i_mode & (S_ISUID | S_ISGID)) {
-                rc = filter_capa_fixoa(exp, oa, obdo_mdsno(oa), capa);
+                rc = filter_capa_fixoa(exp, oa, oa->o_gr, capa);
                 if (rc)
                         GOTO(cleanup, rc);
         }
@@ -752,7 +730,8 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
         /* do not zero out oa->o_valid as it is used in filter_commitrw_write()
          * for setting UID/GID and fid EA in first write time. */
         if (oa->o_valid & OBD_MD_FLGRANT)
-                oa->o_grant = filter_grant(exp,oa->o_grant,oa->o_undirty,left);
+                oa->o_grant = filter_grant(exp, oa->o_grant, oa->o_undirty,
+                                           left, 1);
 
         spin_unlock(&obd->obd_osfs_lock);
         filter_fmd_put(exp, fmd);
@@ -790,8 +769,7 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
                  * be able to proceed in filter_commitrw_write(). thus let's
                  * just wait for writeout completion, should be rare enough.
                  * -bzzz */
-                if (obd->u.filter.fo_writethrough_cache)
-                        wait_on_page_writeback(lnb->page);
+                wait_on_page_writeback(lnb->page);
                 BUG_ON(PageWriteback(lnb->page));
 
                 /* If the filter writes a partial page, then has the file
@@ -920,7 +898,7 @@ static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa,
 
                 if (resource != NULL) {
                         LDLM_RESOURCE_ADDREF(resource);
-                        ns->ns_lvbo->lvbo_update(resource, NULL, 0, 1);
+                        ns->ns_lvbo->lvbo_update(resource, NULL, 1);
                         LDLM_RESOURCE_DELREF(resource);
                         ldlm_resource_putref(resource);
                 }
@@ -929,17 +907,15 @@ static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa,
         if (res->dentry != NULL)
                 inode = res->dentry->d_inode;
 
-        for (i = 0, lnb = res; i < npages; i++, lnb++)
-                if (lnb->page != NULL)
+        for (i = 0, lnb = res; i < npages; i++, lnb++) {
+                if (lnb->page != NULL) {
                         page_cache_release(lnb->page);
-
+                        lnb->page = NULL;
+                }
+        }
         if (inode && (fo->fo_read_cache == 0 ||
                       i_size_read(inode) > fo->fo_readcache_max_filesize))
-                filter_truncate_cache(exp->exp_obd, obj, rnb, npages, res,
-                                      inode);
-
-        for (i = 0, lnb = res; i < npages; i++, lnb++)
-                lnb->page = NULL;
+                filter_release_cache(exp->exp_obd, obj, rnb, inode);
 
         if (res->dentry != NULL)
                 f_dput(res->dentry);