Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / obdfilter / filter_io.c
index 74d5aa8..ea2721e 100644 (file)
@@ -59,7 +59,7 @@ int *obdfilter_created_scratchpad;
 /* Grab the dirty and seen grant announcements from the incoming obdo.
  * We will later calculate the clients new grant and return it.
  * Caller must hold osfs lock */
-static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa)
+void filter_grant_incoming(struct obd_export *exp, struct obdo *oa)
 {
         struct filter_export_data *fed;
         struct obd_device *obd = exp->exp_obd;
@@ -108,6 +108,26 @@ static void filter_grant_incoming(struct obd_export *exp, struct obdo *oa)
         obd->u.filter.fo_tot_granted -= oa->o_dropped;
         fed->fed_grant -= oa->o_dropped;
         fed->fed_dirty = oa->o_dirty;
+
+        if (oa->o_flags & OBD_FL_SHRINK_GRANT) {
+                obd_size left_space = filter_grant_space_left(exp);
+                struct filter_obd *filter = &exp->exp_obd->u.filter;
+
+                /*Only if left_space < fo_tot_clients * 32M, 
+                 *then the grant space could be shrinked */
+                if (left_space < filter->fo_tot_granted_clients * 
+                                 FILTER_GRANT_SHRINK_LIMIT) { 
+                        fed->fed_grant -= oa->o_grant;
+                        filter->fo_tot_granted -= oa->o_grant;
+                        CDEBUG(D_CACHE, "%s: cli %s/%p shrink "LPU64
+                               "fed_grant %ld total "LPU64"\n",
+                               obd->obd_name, exp->exp_client_uuid.uuid,
+                               exp, oa->o_grant, fed->fed_grant,
+                               filter->fo_tot_granted);
+                        oa->o_grant = 0;
+                }
+        }
+
         if (fed->fed_dirty < 0 || fed->fed_grant < 0 || fed->fed_pending < 0) {
                 CERROR("%s: cli %s/%p dirty %ld pend %ld grant %ld\n",
                        obd->obd_name, exp->exp_client_uuid.uuid, exp,
@@ -244,21 +264,26 @@ long filter_grant(struct obd_export *exp, obd_size current_grant,
 /*
  * the routine is used to request pages from pagecache
  *
- * use GFP_NOFS not allowing to enter FS as the client can run on this node
- * and we might end waiting on a page he sent in the request we're serving.
+ * use GFP_NOFS for requests from a local client not allowing to enter FS
+ * as we might end up waiting on a page he sent in the request we're serving.
+ * use __GFP_HIGHMEM so that the pages can use all of the available memory
+ * on 32-bit machines
+ * use more agressive GFP_HIGHUSER flags from non-local clients to be able to
+ * generate more memory pressure, but at the same time use __GFP_NOMEMALLOC
+ * in order not to exhaust emergency reserves.
  *
- * use NORETRY so that the allocator doesn't go crazy: chance to more lucky
- * thread have enough memory to complete his request. for our request client
- * will do resend hopefully -bzzz
+ * See Bug 19529 and Bug 19917 for details. 
  */
-static struct page * filter_get_page(struct obd_device *obd,
-                                     struct inode *inode,
-                                     obd_off offset)
+static struct page *filter_get_page(struct obd_device *obd,
+                                    struct inode *inode,
+                                    obd_off offset,
+                                    int localreq)
 {
         struct page *page;
 
         page = find_or_create_page(inode->i_mapping, offset >> CFS_PAGE_SHIFT,
-                                   GFP_NOFS | __GFP_NORETRY);
+                                   (localreq ? (GFP_NOFS | __GFP_HIGHMEM) 
+                                             : (GFP_HIGHUSER | __GFP_NOMEMALLOC)));
         if (unlikely(page == NULL))
                 lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_NO_PAGE, 1);
 
@@ -328,8 +353,11 @@ void filter_invalidate_cache(struct obd_device *obd, struct obd_ioobj *obj,
         LASSERT(inode != NULL);
 
         for (i = 0, rnb = nb; i < obj->ioo_bufcnt; i++, rnb++) {
-                obd_off start = rnb->offset >> CFS_PAGE_SHIFT;
-                obd_off end = (rnb->offset + rnb->len) >> CFS_PAGE_SHIFT;
+                obd_off start;
+                obd_off end;
+
+                start = rnb->offset >> CFS_PAGE_SHIFT;
+                end = (rnb->offset + rnb->len) >> CFS_PAGE_SHIFT;
                 invalidate_mapping_pages(inode->i_mapping, start, end);
                 /* just to avoid warnings */
                 start = 0;
@@ -337,6 +365,34 @@ void filter_invalidate_cache(struct obd_device *obd, struct obd_ioobj *obj,
         }
 }
 
+/*
+ * the invalidate above doesn't work during read because lnet pins pages.
+ * The truncate is used here instead to drop pages from cache
+ */
+void filter_truncate_cache(struct obd_device *obd, struct obd_ioobj *obj,
+                           struct niobuf_remote *nb, int pages,
+                           struct niobuf_local *res, struct inode *inode)
+{
+        struct niobuf_remote *rnb;
+        int i;
+
+        LASSERT(inode != NULL);
+#ifdef HAVE_TRUNCATE_RANGE
+        for (i = 0, rnb = nb; i < obj->ioo_bufcnt; i++, rnb++) {
+                /* remove pages in which range is fit */
+                truncate_inode_pages_range(inode->i_mapping,
+                                           rnb->offset & CFS_PAGE_MASK,
+                                           (rnb->offset + rnb->len - 1) |
+                                           ~CFS_PAGE_MASK);
+        }
+#elif (defined HAVE_TRUNCATE_COMPLETE)
+        for (i = 0, lnb = res; i < pages; i++, lnb++)
+                truncate_complete_page(inode->i_mapping, lnb->page);
+#else
+#error "Nor truncate_inode_pages_range or truncate_complete_page are supported"
+#endif
+}
+
 static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
                               int objcount, struct obd_ioobj *obj,
                               struct niobuf_remote *nb,
@@ -345,7 +401,6 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
                               struct lustre_capa *capa)
 {
         struct obd_device *obd = exp->exp_obd;
-        struct filter_obd *fo = &obd->u.filter;
         struct timeval start, end;
         struct lvfs_run_ctxt saved;
         struct niobuf_local *lnb;
@@ -372,7 +427,8 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
                 spin_lock(&obd->obd_osfs_lock);
                 filter_grant_incoming(exp, oa);
 
-                oa->o_grant = 0;
+                if (!(oa->o_flags & OBD_FL_SHRINK_GRANT))
+                        oa->o_grant = 0;
                 spin_unlock(&obd->obd_osfs_lock);
         }
 
@@ -409,7 +465,7 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
                          * so it's easy to detect later. */
                         break;
 
-                lnb->page = filter_get_page(obd, inode, lnb->offset);
+                lnb->page = filter_get_page(obd, inode, lnb->offset, 0);
                 if (lnb->page == NULL)
                         GOTO(cleanup, rc = -ENOMEM);
 
@@ -467,10 +523,6 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
                 }
         }
 
-        if (inode && (fo->fo_read_cache == 0 ||
-                        i_size_read(inode) > fo->fo_readcache_max_filesize))
-                filter_invalidate_cache(obd, obj, nb, inode);
-
         if (rc != 0) {
                 if (dentry != NULL)
                         f_dput(dentry);
@@ -627,7 +679,7 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
         void *iobuf;
         obd_size left;
         unsigned long now = jiffies, timediff;
-        int rc = 0, i, tot_bytes = 0, cleanup_phase = 0;
+        int rc = 0, i, tot_bytes = 0, cleanup_phase = 0, localreq = 0;
         ENTRY;
         LASSERT(objcount == 1);
         LASSERT(obj->ioo_bufcnt > 0);
@@ -637,13 +689,16 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
         if (rc)
                 RETURN(rc);
 
-        push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
-        iobuf = filter_iobuf_get(&exp->exp_obd->u.filter, oti);
+        if (exp->exp_connection->c_peer.nid == exp->exp_connection->c_self) 
+                localreq = 1;
+
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+        iobuf = filter_iobuf_get(&obd->u.filter, oti);
         if (IS_ERR(iobuf))
                 GOTO(cleanup, rc = PTR_ERR(iobuf));
         cleanup_phase = 1;
 
-        dentry = filter_fid2dentry(exp->exp_obd, NULL, obj->ioo_gr,
+        dentry = filter_fid2dentry(obd, NULL, obj->ioo_gr,
                                    obj->ioo_id);
         if (IS_ERR(dentry))
                 GOTO(cleanup, rc = PTR_ERR(dentry));
@@ -651,15 +706,22 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
 
         if (dentry->d_inode == NULL) {
                 CERROR("%s: trying to BRW to non-existent file "LPU64"\n",
-                       exp->exp_obd->obd_name, obj->ioo_id);
+                       obd->obd_name, obj->ioo_id);
                 GOTO(cleanup, rc = -ENOENT);
         }
 
+        if (oa->o_valid & (OBD_MD_FLUID | OBD_MD_FLGID) &&
+            dentry->d_inode->i_mode & (S_ISUID | S_ISGID)) {
+                rc = filter_capa_fixoa(exp, oa, obdo_mdsno(oa), capa);
+                if (rc)
+                        GOTO(cleanup, rc);
+        }
+
         rc = filter_map_remote_to_local(objcount, obj, nb, npages, res);
         if (rc)
                 GOTO(cleanup, rc);
 
-        fsfilt_check_slow(exp->exp_obd, now, "preprw_write setup");
+        fsfilt_check_slow(obd, now, "preprw_write setup");
 
         /* Don't update inode timestamps if this write is older than a
          * setattr which modifies the timestamps. b=10150 */
@@ -669,7 +731,7 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
         fmd = filter_fmd_find(exp, obj->ioo_id, obj->ioo_gr);
 
         LASSERT(oa != NULL);
-        spin_lock(&exp->exp_obd->obd_osfs_lock);
+        spin_lock(&obd->obd_osfs_lock);
         filter_grant_incoming(exp, oa);
         if (fmd && fmd->fmd_mactime_xid > oti->oti_xid)
                 oa->o_valid &= ~(OBD_MD_FLMTIME | OBD_MD_FLCTIME |
@@ -692,11 +754,20 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
         if (oa->o_valid & OBD_MD_FLGRANT)
                 oa->o_grant = filter_grant(exp,oa->o_grant,oa->o_undirty,left);
 
-        spin_unlock(&exp->exp_obd->obd_osfs_lock);
+        spin_unlock(&obd->obd_osfs_lock);
         filter_fmd_put(exp, fmd);
 
         if (rc)
                 GOTO(cleanup, rc);
+        cleanup_phase = 4;
+
+        /* Filter truncate first locks i_mutex then partally truncated
+         * page, filter write code first locks pages then take
+         * i_mutex.  To avoid a deadlock in case of concurrent
+         * punch/write requests from one client, filter writes and
+         * filter truncates are serialized by i_alloc_sem, allowing
+         * multiple writes or single truncate. */
+        down_read(&dentry->d_inode->i_alloc_sem);
 
         do_gettimeofday(&start);
         for (i = 0, lnb = res; i < *npages; i++, lnb++) {
@@ -706,10 +777,10 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
                  * needs to keep the pages all aligned properly. */
                 lnb->dentry = dentry;
 
-                lnb->page = filter_get_page(obd, dentry->d_inode, lnb->offset);
+                lnb->page = filter_get_page(obd, dentry->d_inode, lnb->offset,
+                                            localreq);
                 if (lnb->page == NULL)
                         GOTO(cleanup, rc = -ENOMEM);
-                cleanup_phase = 4;
 
                 /* DLM locking protects us from write and truncate competing
                  * for same region, but truncate can leave dirty page in the
@@ -739,7 +810,7 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
                                                LPU64" flg %x before EOF %llu\n",
                                                lnb->len, lnb->offset,lnb->flags,
                                                i_size_read(dentry->d_inode));
-                                filter_iobuf_add_page(exp->exp_obd, iobuf,
+                                filter_iobuf_add_page(obd, iobuf,
                                                       dentry->d_inode,
                                                       lnb->page);
                         } else {
@@ -769,7 +840,7 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
         rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, exp,
                               NULL, NULL, NULL);
 
-        fsfilt_check_slow(exp->exp_obd, now, "start_page_write");
+        fsfilt_check_slow(obd, now, "start_page_write");
 
         if (exp->exp_nid_stats && exp->exp_nid_stats->nid_stats)
                 lprocfs_counter_add(exp->exp_nid_stats->nid_stats,
@@ -786,22 +857,23 @@ cleanup:
                                         lnb->page = NULL;
                                 }
                         }
+                        up_read(&dentry->d_inode->i_alloc_sem);
                 }
         case 3:
-                filter_iobuf_put(&exp->exp_obd->u.filter, iobuf, oti);
+                filter_iobuf_put(&obd->u.filter, iobuf, oti);
         case 2:
-                pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+                pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 if (rc)
                         f_dput(dentry);
                 break;
         case 1:
-                filter_iobuf_put(&exp->exp_obd->u.filter, iobuf, oti);
+                filter_iobuf_put(&obd->u.filter, iobuf, oti);
         case 0:
-                spin_lock(&exp->exp_obd->obd_osfs_lock);
+                spin_lock(&obd->obd_osfs_lock);
                 if (oa)
                         filter_grant_incoming(exp, oa);
-                spin_unlock(&exp->exp_obd->obd_osfs_lock);
-                pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+                spin_unlock(&obd->obd_osfs_lock);
+                pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 break;
         default:;
         }
@@ -830,6 +902,7 @@ static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa,
                                 int npages, struct niobuf_local *res,
                                 struct obd_trans_info *oti, int rc)
 {
+        struct filter_obd *fo = &exp->exp_obd->u.filter;
         struct inode *inode = NULL;
         struct ldlm_res_id res_id;
         struct ldlm_resource *resource = NULL;
@@ -846,7 +919,9 @@ static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa,
                 resource = ldlm_resource_get(ns, NULL, &res_id, LDLM_EXTENT, 0);
 
                 if (resource != NULL) {
+                        LDLM_RESOURCE_ADDREF(resource);
                         ns->ns_lvbo->lvbo_update(resource, NULL, 0, 1);
+                        LDLM_RESOURCE_DELREF(resource);
                         ldlm_resource_putref(resource);
                 }
         }
@@ -854,12 +929,17 @@ static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa,
         if (res->dentry != NULL)
                 inode = res->dentry->d_inode;
 
-        for (i = 0, lnb = res; i < npages; i++, lnb++) {
-                if (lnb->page != NULL) {
+        for (i = 0, lnb = res; i < npages; i++, lnb++)
+                if (lnb->page != NULL)
                         page_cache_release(lnb->page);
-                        lnb->page = NULL;
-                }
-        }
+
+        if (inode && (fo->fo_read_cache == 0 ||
+                      i_size_read(inode) > fo->fo_readcache_max_filesize))
+                filter_truncate_cache(exp->exp_obd, obj, rnb, npages, res,
+                                      inode);
+
+        for (i = 0, lnb = res; i < npages; i++, lnb++)
+                lnb->page = NULL;
 
         if (res->dentry != NULL)
                 f_dput(res->dentry);