Whamcloud - gitweb
LU-9045 osp: Revert "LU-8840 osp: handle EA cache properly"
[fs/lustre-release.git] / lustre / osp / osp_object.c
index 91367d2..1fcbac7 100644 (file)
  *
  * You should have received a copy of the GNU General Public License
  * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
  *
  * GPL HEADER END
  */
@@ -27,7 +23,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2015, Intel Corporation.
+ * Copyright (c) 2012, 2016, Intel Corporation.
  */
 /*
  * lustre/osp/osp_object.c
@@ -775,12 +771,14 @@ static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt,
        struct osp_object       *obj     = dt2osp_obj(dt);
        struct osp_device       *osp     = lu2osp_dev(dt->do_lu.lo_dev);
        struct osp_xattr_entry  *oxe;
-       __u16                    namelen = strlen(name);
+       __u16 namelen;
        int                      rc      = 0;
 
        LASSERT(buf != NULL);
        LASSERT(name != NULL);
 
+       namelen = strlen(name);
+
        /* If only for xattr size, return directly. */
        if (unlikely(buf->lb_len == 0))
                return 0;
@@ -1094,6 +1092,16 @@ int osp_xattr_set(const struct lu_env *env, struct dt_object *dt,
        if (rc != 0)
                RETURN(rc);
 
+       /* Do not cache linkEA that may be self-adjusted by peers
+        * under EA overflow case. */
+       if (strcmp(name, XATTR_NAME_LINK) == 0) {
+               oxe = osp_oac_xattr_find(o, name, true);
+               if (oxe != NULL)
+                       osp_oac_xattr_put(oxe);
+
+               RETURN(0);
+       }
+
        oxe = osp_oac_xattr_find_or_add(o, name, buf->lb_len);
        if (oxe == NULL) {
                CWARN("%s: cannot cache xattr '%s' of "DFID"\n",
@@ -1204,6 +1212,21 @@ int osp_xattr_del(const struct lu_env *env, struct dt_object *dt,
        return 0;
 }
 
+void osp_obj_invalidate_cache(struct osp_object *obj)
+{
+       struct osp_xattr_entry *oxe;
+       struct osp_xattr_entry *tmp;
+
+       spin_lock(&obj->opo_lock);
+       list_for_each_entry_safe(oxe, tmp, &obj->opo_xattr_list, oxe_list) {
+               oxe->oxe_ready = 0;
+               list_del_init(&oxe->oxe_list);
+               osp_oac_xattr_put(oxe);
+       }
+       obj->opo_attr.la_valid = 0;
+       spin_unlock(&obj->opo_lock);
+}
+
 /**
  * Implement OSP layer dt_object_operations::do_invalidate() interface.
  *
@@ -1218,17 +1241,13 @@ int osp_xattr_del(const struct lu_env *env, struct dt_object *dt,
 int osp_invalidate(const struct lu_env *env, struct dt_object *dt)
 {
        struct osp_object *obj = dt2osp_obj(dt);
-       struct osp_xattr_entry *oxe;
-       struct osp_xattr_entry *tmp;
        ENTRY;
 
+       CDEBUG(D_HA, "Invalidate osp_object "DFID"\n",
+              PFID(lu_object_fid(&dt->do_lu)));
+       osp_obj_invalidate_cache(obj);
+
        spin_lock(&obj->opo_lock);
-       list_for_each_entry_safe(oxe, tmp, &obj->opo_xattr_list, oxe_list) {
-               oxe->oxe_ready = 0;
-               list_del_init(&oxe->oxe_list);
-               osp_oac_xattr_put(oxe);
-       }
-       obj->opo_attr.la_valid = 0;
        obj->opo_stale = 1;
        spin_unlock(&obj->opo_lock);
 
@@ -1650,7 +1669,6 @@ static int osp_it_fetch(const struct lu_env *env, struct osp_it *it)
        struct lu_device         *dev   = it->ooi_obj->do_lu.lo_dev;
        struct osp_device        *osp   = lu2osp_dev(dev);
        struct page             **pages;
-       struct lu_device *top_device;
        struct ptlrpc_request    *req   = NULL;
        struct ptlrpc_bulk_desc  *desc;
        struct idx_info          *ii;
@@ -1661,7 +1679,7 @@ static int osp_it_fetch(const struct lu_env *env, struct osp_it *it)
 
        /* 1MB bulk */
        npages = min_t(unsigned int, OFD_MAX_BRW_SIZE, 1 << 20);
-       npages /= PAGE_CACHE_SIZE;
+       npages /= PAGE_SIZE;
 
        OBD_ALLOC(pages, npages * sizeof(*pages));
        if (pages == NULL)
@@ -1670,7 +1688,7 @@ static int osp_it_fetch(const struct lu_env *env, struct osp_it *it)
        it->ooi_pages = pages;
        it->ooi_total_npages = npages;
        for (i = 0; i < npages; i++) {
-               pages[i] = alloc_page(GFP_IOFS);
+               pages[i] = alloc_page(GFP_NOFS);
                if (pages[i] == NULL)
                        RETURN(-ENOMEM);
        }
@@ -1686,13 +1704,7 @@ static int osp_it_fetch(const struct lu_env *env, struct osp_it *it)
                RETURN(rc);
        }
 
-       /* Let's allow this request during recovery, otherwise
-        * if the remote target is also in recovery status,
-        * it might cause deadlock */
-       top_device = dev->ld_site->ls_top_dev;
-       if (top_device->ld_obd->obd_recovering)
-               req->rq_allow_replay = 1;
-
+       osp_set_req_replay(osp, req);
        req->rq_request_portal = OUT_PORTAL;
        ii = req_capsule_client_get(&req->rq_pill, &RMF_IDX_INFO);
        memset(ii, 0, sizeof(*ii));
@@ -1728,7 +1740,7 @@ static int osp_it_fetch(const struct lu_env *env, struct osp_it *it)
 
        for (i = 0; i < npages; i++)
                desc->bd_frag_ops->add_kiov_frag(desc, pages[i], 0,
-                                                PAGE_CACHE_SIZE);
+                                                PAGE_SIZE);
 
        ptlrpc_request_set_replen(req);
        rc = ptlrpc_queue_wait(req);
@@ -1746,7 +1758,7 @@ static int osp_it_fetch(const struct lu_env *env, struct osp_it *it)
                 GOTO(out, rc = -EPROTO);
 
        npages = (ii->ii_count + LU_PAGE_COUNT - 1) >>
-                (PAGE_CACHE_SHIFT - LU_PAGE_SHIFT);
+                (PAGE_SHIFT - LU_PAGE_SHIFT);
        if (npages > it->ooi_total_npages) {
                CERROR("%s: returned more pages than expected, %u > %u\n",
                       osp->opd_obd->obd_name, npages, it->ooi_total_npages);