Whamcloud - gitweb
LU-6401 headers: Create a header for obdo related functions
[fs/lustre-release.git] / lustre / osp / osp_object.c
index ff6107e..96e3135 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2014, Intel Corporation.
+ * Copyright (c) 2012, 2015, Intel Corporation.
  */
 /*
  * lustre/osp/osp_object.c
 
 #define DEBUG_SUBSYSTEM S_MDS
 
+#include <lustre_obdo.h>
+#include <lustre_swab.h>
+
 #include "osp_internal.h"
 
 static inline __u32 osp_dev2node(struct osp_device *osp)
@@ -512,6 +515,7 @@ static int osp_declare_attr_get(const struct lu_env *env, struct dt_object *dt)
        mutex_lock(&osp->opd_async_requests_mutex);
        rc = osp_insert_async_request(env, OUT_ATTR_GET, obj, 0, NULL, NULL,
                                      &obj->opo_ooa->ooa_attr,
+                                     sizeof(struct obdo),
                                      osp_attr_get_interpterer);
        mutex_unlock(&osp->opd_async_requests_mutex);
 
@@ -552,7 +556,7 @@ int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
 
        if (obj->opo_ooa != NULL) {
                spin_lock(&obj->opo_lock);
-               if (obj->opo_ooa->ooa_attr.la_valid != 0) {
+               if (obj->opo_ooa->ooa_attr.la_valid != 0 && !obj->opo_stale) {
                        *attr = obj->opo_ooa->ooa_attr;
                        spin_unlock(&obj->opo_lock);
 
@@ -601,7 +605,12 @@ int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
        if (rc != 0)
                GOTO(out, rc);
 
-       GOTO(out, rc = 0);
+       spin_lock(&obj->opo_lock);
+       if (obj->opo_stale)
+               obj->opo_stale = 0;
+       spin_unlock(&obj->opo_lock);
+
+       GOTO(out, rc);
 
 out:
        if (req != NULL)
@@ -851,25 +860,28 @@ static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt,
 
        mutex_lock(&osp->opd_async_requests_mutex);
        rc = osp_insert_async_request(env, OUT_XATTR_GET, obj, 1,
-                                     &namelen, (const void **)&name, oxe,
+                                     &namelen, (const void **)&name,
+                                     oxe, buf->lb_len,
                                      osp_xattr_get_interpterer);
        if (rc != 0) {
                mutex_unlock(&osp->opd_async_requests_mutex);
                osp_oac_xattr_put(oxe);
        } else {
-               struct osp_update_request *update;
+               struct osp_update_request *our;
+               struct osp_update_request_sub *ours;
 
                /* XXX: Currently, we trigger the batched async OUT
                 *      RPC via dt_declare_xattr_get(). It is not
                 *      perfect solution, but works well now.
                 *
                 *      We will improve it in the future. */
-               update = osp->opd_async_requests;
-               if (update != NULL && update->our_req != NULL &&
-                   update->our_req->ourq_count > 0) {
+               our = osp->opd_async_requests;
+               ours = osp_current_object_update_request(our);
+               if (ours != NULL && ours->ours_req != NULL &&
+                   ours->ours_req->ourq_count > 0) {
                        osp->opd_async_requests = NULL;
                        mutex_unlock(&osp->opd_async_requests_mutex);
-                       rc = osp_unplug_async_request(env, osp, update);
+                       rc = osp_unplug_async_request(env, osp, our);
                } else {
                        mutex_unlock(&osp->opd_async_requests_mutex);
                }
@@ -933,38 +945,41 @@ int osp_xattr_get(const struct lu_env *env, struct dt_object *dt,
        if (unlikely(obj->opo_non_exist))
                RETURN(-ENOENT);
 
-       oxe = osp_oac_xattr_find(obj, name, false);
-       if (oxe != NULL) {
-               spin_lock(&obj->opo_lock);
-               if (oxe->oxe_ready) {
-                       if (!oxe->oxe_exist)
-                               GOTO(unlock, rc = -ENODATA);
+       /* Only cache xattr for OST object */
+       if (!osp->opd_connect_mdt) {
+               oxe = osp_oac_xattr_find(obj, name, false);
+               if (oxe != NULL) {
+                       spin_lock(&obj->opo_lock);
+                       if (oxe->oxe_ready) {
+                               if (!oxe->oxe_exist)
+                                       GOTO(unlock, rc = -ENODATA);
 
-                       if (buf->lb_buf == NULL)
-                               GOTO(unlock, rc = oxe->oxe_vallen);
+                               if (buf->lb_buf == NULL)
+                                       GOTO(unlock, rc = oxe->oxe_vallen);
 
-                       if (buf->lb_len < oxe->oxe_vallen)
-                               GOTO(unlock, rc = -ERANGE);
+                               if (buf->lb_len < oxe->oxe_vallen)
+                                       GOTO(unlock, rc = -ERANGE);
 
-                       memcpy(buf->lb_buf, oxe->oxe_value, oxe->oxe_vallen);
+                               memcpy(buf->lb_buf, oxe->oxe_value,
+                                      oxe->oxe_vallen);
 
-                       GOTO(unlock, rc = oxe->oxe_vallen);
+                               GOTO(unlock, rc = oxe->oxe_vallen);
 
 unlock:
-                       spin_unlock(&obj->opo_lock);
-                       osp_oac_xattr_put(oxe);
+                               spin_unlock(&obj->opo_lock);
+                               osp_oac_xattr_put(oxe);
 
-                       return rc;
+                               return rc;
+                       }
+                       spin_unlock(&obj->opo_lock);
                }
-               spin_unlock(&obj->opo_lock);
        }
-
        update = osp_update_request_create(dev);
        if (IS_ERR(update))
                GOTO(out, rc = PTR_ERR(update));
 
        rc = osp_update_rpc_pack(env, xattr_get, update, OUT_XATTR_GET,
-                                lu_object_fid(&dt->do_lu), name);
+                                lu_object_fid(&dt->do_lu), name, buf->lb_len);
        if (rc != 0) {
                CERROR("%s: Insert update error "DFID": rc = %d\n",
                       dname, PFID(lu_object_fid(&dt->do_lu)), rc);
@@ -972,7 +987,7 @@ unlock:
        }
 
        rc = osp_remote_sync(env, osp, update, &req);
-       if (rc != 0) {
+       if (rc < 0) {
                if (rc == -ENOENT) {
                        dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
                        obj->opo_non_exist = 1;
@@ -1020,14 +1035,13 @@ unlock:
                GOTO(out, rc);
 
        if (buf->lb_buf == NULL)
-               GOTO(out, rc = rbuf->lb_len);
+               GOTO(out, rc);
 
        if (unlikely(buf->lb_len < rbuf->lb_len))
                GOTO(out, rc = -ERANGE);
 
        memcpy(buf->lb_buf, rbuf->lb_buf, rbuf->lb_len);
-       rc = rbuf->lb_len;
-       if (obj->opo_ooa == NULL)
+       if (obj->opo_ooa == NULL || osp->opd_connect_mdt)
                GOTO(out, rc);
 
        if (oxe == NULL) {
@@ -1141,6 +1155,7 @@ int osp_xattr_set(const struct lu_env *env, struct dt_object *dt,
                  struct thandle *th)
 {
        struct osp_object       *o = dt2osp_obj(dt);
+       struct osp_device       *osp = lu2osp_dev(dt->do_lu.lo_dev);
        struct osp_update_request *update;
        struct osp_xattr_entry  *oxe;
        int                     rc;
@@ -1154,7 +1169,7 @@ int osp_xattr_set(const struct lu_env *env, struct dt_object *dt,
 
        rc = osp_update_rpc_pack(env, xattr_set, update, OUT_XATTR_SET,
                                 lu_object_fid(&dt->do_lu), buf, name, fl);
-       if (rc != 0 || o->opo_ooa == NULL)
+       if (rc != 0 || o->opo_ooa == NULL || osp->opd_connect_mdt)
                RETURN(rc);
 
        oxe = osp_oac_xattr_find_or_add(o, name, buf->lb_len);
@@ -1682,6 +1697,7 @@ static int osp_it_fetch(const struct lu_env *env, struct osp_it *it)
        struct lu_device         *dev   = it->ooi_obj->do_lu.lo_dev;
        struct osp_device        *osp   = lu2osp_dev(dev);
        struct page             **pages;
+       struct lu_device *top_device;
        struct ptlrpc_request    *req   = NULL;
        struct ptlrpc_bulk_desc  *desc;
        struct idx_info          *ii;
@@ -1717,6 +1733,13 @@ static int osp_it_fetch(const struct lu_env *env, struct osp_it *it)
                RETURN(rc);
        }
 
+       /* Let's allow this request during recovery, otherwise
+        * if the remote target is also in recovery status,
+        * it might cause deadlock */
+       top_device = dev->ld_site->ls_top_dev;
+       if (top_device->ld_obd->obd_recovering)
+               req->rq_allow_replay = 1;
+
        req->rq_request_portal = OUT_PORTAL;
        ii = req_capsule_client_get(&req->rq_pill, &RMF_IDX_INFO);
        memset(ii, 0, sizeof(*ii));
@@ -1741,15 +1764,18 @@ static int osp_it_fetch(const struct lu_env *env, struct osp_it *it)
 
        ptlrpc_at_set_req_timeout(req);
 
-       desc = ptlrpc_prep_bulk_imp(req, npages, 1, BULK_PUT_SINK,
-                                   MDS_BULK_PORTAL);
+       desc = ptlrpc_prep_bulk_imp(req, npages, 1,
+                                   PTLRPC_BULK_PUT_SINK | PTLRPC_BULK_BUF_KIOV,
+                                   MDS_BULK_PORTAL,
+                                   &ptlrpc_bulk_kiov_pin_ops);
        if (desc == NULL) {
                ptlrpc_request_free(req);
                RETURN(-ENOMEM);
        }
 
        for (i = 0; i < npages; i++)
-               ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_CACHE_SIZE);
+               desc->bd_frag_ops->add_kiov_frag(desc, pages[i], 0,
+                                                PAGE_CACHE_SIZE);
 
        ptlrpc_request_set_replen(req);
        rc = ptlrpc_queue_wait(req);
@@ -2115,13 +2141,18 @@ static int osp_object_init(const struct lu_env *env, struct lu_object *o,
 
                po->opo_obj.do_ops = &osp_md_obj_ops;
                po->opo_obj.do_body_ops = &osp_md_body_ops;
-               rc = po->opo_obj.do_ops->do_attr_get(env, lu2dt_obj(o), la);
-               if (rc == 0)
-                       o->lo_header->loh_attr |=
-                               LOHA_EXISTS | (la->la_mode & S_IFMT);
-               if (rc == -ENOENT) {
+               if (conf != NULL && conf->loc_flags & LOC_F_NEW) {
                        po->opo_non_exist = 1;
-                       rc = 0;
+               } else {
+                       rc = po->opo_obj.do_ops->do_attr_get(env, lu2dt_obj(o),
+                                                            la);
+                       if (rc == 0)
+                               o->lo_header->loh_attr |=
+                                       LOHA_EXISTS | (la->la_mode & S_IFMT);
+                       if (rc == -ENOENT) {
+                               po->opo_non_exist = 1;
+                               rc = 0;
+                       }
                }
                init_rwsem(&po->opo_sem);
        }