Whamcloud - gitweb
LU-7521 ldlm: LDLM_DEBUG() shouldn't be passed \n
[fs/lustre-release.git] / lustre / osp / osp_md_object.c
index 69c815c..21ccacf 100644 (file)
@@ -53,6 +53,7 @@
 
 #define DEBUG_SUBSYSTEM S_MDS
 
+#include <llog_swab.h>
 #include <lustre_log.h>
 #include "osp_internal.h"
 
@@ -885,40 +886,37 @@ static int osp_md_object_lock(const struct lu_env *env,
        struct ldlm_res_id      *res_id;
        struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
        struct osp_device       *osp = dt2osp_dev(dt_dev);
+       struct lu_device        *top_device;
        struct ptlrpc_request   *req;
        int                     rc = 0;
-       __u64                   flags = 0;
-       enum ldlm_mode          mode;
+       __u64                   flags = LDLM_FL_NO_LRU;
 
        res_id = einfo->ei_res_id;
        LASSERT(res_id != NULL);
 
-       mode = ldlm_lock_match(osp->opd_obd->obd_namespace,
-                              LDLM_FL_BLOCK_GRANTED, res_id,
-                              einfo->ei_type, policy,
-                              einfo->ei_mode, lh, 0);
-       if (mode > 0)
-               return ELDLM_OK;
-
        if (einfo->ei_nonblock)
                flags |= LDLM_FL_BLOCK_NOWAIT;
+       if (einfo->ei_mode & (LCK_EX | LCK_PW))
+               flags |= LDLM_FL_COS_INCOMPAT;
 
        req = ldlm_enqueue_pack(osp->opd_exp, 0);
        if (IS_ERR(req))
                RETURN(PTR_ERR(req));
 
+       /* During recovery, it needs to let OSP send enqueue
+        * without checking recoverying status, in case the
+        * other target is being recovered at the same time,
+        * and if we wait here for the import to be recovered,
+        * it might cause deadlock */
+       top_device = dt_dev->dd_lu_dev.ld_site->ls_top_dev;
+       if (top_device->ld_obd->obd_recovering)
+               req->rq_allow_replay = 1;
+
        rc = ldlm_cli_enqueue(osp->opd_exp, &req, einfo, res_id,
                              (const union ldlm_policy_data *)policy,
                              &flags, NULL, 0, LVB_T_NONE, lh, 0);
 
        ptlrpc_req_finished(req);
-       if (rc == ELDLM_OK) {
-               struct ldlm_lock *lock;
-
-               lock = __ldlm_handle2lock(lh, 0);
-               ldlm_set_cbpending(lock);
-               LDLM_LOCK_PUT(lock);
-       }
 
        return rc == ELDLM_OK ? 0 : -EIO;
 }
@@ -1057,7 +1055,20 @@ static ssize_t osp_md_declare_write(const struct lu_env *env,
                                    const struct lu_buf *buf,
                                    loff_t pos, struct thandle *th)
 {
-       return osp_trans_update_request_create(th);
+       struct osp_device *osp = dt2osp_dev(th->th_dev);
+       int rc;
+
+       rc = osp_trans_update_request_create(th);
+       if (rc != 0)
+               return rc;
+
+       if (osp->opd_update == NULL)
+               return 0;
+
+       if (dt2osp_obj(dt)->opo_stale)
+               return -ESTALE;
+
+       return 0;
 }
 
 /**
@@ -1089,13 +1100,17 @@ static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt,
        update = thandle_to_osp_update_request(th);
        LASSERT(update != NULL);
 
+       CDEBUG(D_INFO, "write "DFID" offset = "LPU64" length = %zu\n",
+              PFID(lu_object_fid(&dt->do_lu)), *pos, buf->lb_len);
+
        rc = osp_update_rpc_pack(env, write, update, OUT_WRITE,
                                 lu_object_fid(&dt->do_lu), buf, *pos);
        if (rc < 0)
                RETURN(rc);
 
-       CDEBUG(D_INFO, "write "DFID" offset = "LPU64" length = %zu\n",
-              PFID(lu_object_fid(&dt->do_lu)), *pos, buf->lb_len);
+       rc = osp_check_and_set_rpc_version(oth, obj);
+       if (rc < 0)
+               RETURN(rc);
 
        /* XXX: how about the write error happened later? */
        *pos += buf->lb_len;
@@ -1105,10 +1120,6 @@ static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt,
            obj->opo_ooa->ooa_attr.la_size < *pos)
                obj->opo_ooa->ooa_attr.la_size = *pos;
 
-       rc = osp_check_and_set_rpc_version(oth);
-       if (rc < 0)
-               RETURN(rc);
-
        RETURN(buf->lb_len);
 }
 
@@ -1119,7 +1130,7 @@ static ssize_t osp_md_read(const struct lu_env *env, struct dt_object *dt,
        struct dt_device *dt_dev        = &osp->opd_dt_dev;
        struct lu_buf *lbuf = &osp_env_info(env)->osi_lb2;
        char *ptr = rbuf->lb_buf;
-       struct osp_update_request *update = NULL;
+       struct osp_update_request *update;
        struct ptlrpc_request *req = NULL;
        struct out_read_reply *orr;
        struct ptlrpc_bulk_desc *desc;
@@ -1135,7 +1146,7 @@ static ssize_t osp_md_read(const struct lu_env *env, struct dt_object *dt,
         * update_remote list of the thandle.  */
        update = osp_update_request_create(dt_dev);
        if (IS_ERR(update))
-               GOTO(out, rc = PTR_ERR(update));
+               RETURN(PTR_ERR(update));
 
        rc = osp_update_rpc_pack(env, read, update, OUT_READ,
                                 lu_object_fid(&dt->do_lu),
@@ -1143,13 +1154,16 @@ static ssize_t osp_md_read(const struct lu_env *env, struct dt_object *dt,
        if (rc != 0) {
                CERROR("%s: cannot insert update: rc = %d\n",
                       dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
-               GOTO(out, rc);
+               GOTO(out_update, rc);
        }
 
+       CDEBUG(D_INFO, "%s "DFID" read offset %llu size %zu\n",
+              dt_dev->dd_lu_dev.ld_obd->obd_name,
+              PFID(lu_object_fid(&dt->do_lu)), *pos, rbuf->lb_len);
        rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import, update,
                                 &req);
        if (rc != 0)
-               GOTO(out, rc);
+               GOTO(out_update, rc);
 
        nbufs = (rbuf->lb_len + OUT_BULK_BUFFER_SIZE - 1) /
                                        OUT_BULK_BUFFER_SIZE;
@@ -1210,11 +1224,10 @@ static ssize_t osp_md_read(const struct lu_env *env, struct dt_object *dt,
        rc = orr->orr_size;
        *pos = orr->orr_offset;
 out:
-       if (req != NULL)
-               ptlrpc_req_finished(req);
+       ptlrpc_req_finished(req);
 
-       if (update != NULL)
-               osp_update_request_destroy(update);
+out_update:
+       osp_update_request_destroy(update);
 
        RETURN(rc);
 }