Whamcloud - gitweb
- fixes in mdd about lockful and lockless versions of __mdd_lookup();
authoryury <yury>
Thu, 28 Sep 2006 16:37:15 +0000 (16:37 +0000)
committeryury <yury>
Thu, 28 Sep 2006 16:37:15 +0000 (16:37 +0000)
- some cleanups in mdt_open.c;
- fixes in fld about adding targets.
- rename md_data->create_mode to ->mode as it is also used in others than create requests.

13 files changed:
lustre/cmm/mdc_object.c
lustre/fld/fld_request.c
lustre/include/lustre/lustre_idl.h
lustre/ldlm/ldlm_lib.c
lustre/liblustre/file.c
lustre/llite/llite_lib.c
lustre/lmv/lmv_obd.c
lustre/mdc/mdc_lib.c
lustre/mdc/mdc_reint.c
lustre/mdd/mdd_handler.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_open.c
lustre/ptlrpc/client.c

index 58fea39..e1983c3 100644 (file)
@@ -351,7 +351,7 @@ static int mdc_ref_del(const struct lu_context *ctx, struct md_object *mo,
 
         mci = mdc_info_init(ctx);
         mci->mci_opdata.fid1 = *lu_object_fid(&mo->mo_lu);
-        mci->mci_opdata.create_mode = la->la_mode;
+        mci->mci_opdata.mode = la->la_mode;
         mci->mci_opdata.mod_time = la->la_ctime;
         if (uc &&
             ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
@@ -419,7 +419,7 @@ static int mdc_rename_tgt(const struct lu_context *ctx, struct md_object *mo_p,
         mci = mdc_info_init(ctx);
         mci->mci_opdata.fid1 = *lu_object_fid(&mo_p->mo_lu);
         mci->mci_opdata.fid2 = *lf;
-        mci->mci_opdata.create_mode = la->la_mode;
+        mci->mci_opdata.mode = la->la_mode;
         mci->mci_opdata.mod_time = la->la_ctime;
         if (uc &&
             ((uc->mu_valid == UCRED_OLD) || (uc->mu_valid == UCRED_NEW))) {
index ab6d530..15ed7ec 100644 (file)
@@ -134,16 +134,16 @@ fld_client_get_target(struct lu_client_fld *fld,
 int fld_client_add_target(struct lu_client_fld *fld,
                           struct lu_fld_target *tar)
 {
-        const char *tar_name = fld_target_name(tar);
+        const char *name = fld_target_name(tar);
         struct lu_fld_target *target, *tmp;
         ENTRY;
 
         LASSERT(tar != NULL);
-        LASSERT(tar_name != NULL);
+        LASSERT(name != NULL);
         LASSERT(tar->ft_srv != NULL || tar->ft_exp != NULL);
 
         CDEBUG(D_INFO|D_WARNING, "%s: Adding target %s\n",
-              fld->lcf_name, tar_name);
+              fld->lcf_name, name);
 
         OBD_ALLOC_PTR(target);
         if (target == NULL)
@@ -151,13 +151,11 @@ int fld_client_add_target(struct lu_client_fld *fld,
 
         spin_lock(&fld->lcf_lock);
         list_for_each_entry(tmp, &fld->lcf_targets, ft_chain) {
-                const char *tmp_name = fld_target_name(tmp);
-                
-                if (strlen(tar_name) == strlen(tmp_name) &&
-                    strcmp(tmp_name, tar_name) == 0)
-                {
+                if (tmp->ft_idx == tar->ft_idx) {
                         spin_unlock(&fld->lcf_lock);
                         OBD_FREE_PTR(target);
+                        CERROR("Target %s exists in FLD and known as %s:#"LPU64"\n",
+                               name, fld_target_name(tmp), tmp->ft_idx);
                         RETURN(-EEXIST);
                 }
         }
@@ -178,7 +176,7 @@ int fld_client_add_target(struct lu_client_fld *fld,
 }
 EXPORT_SYMBOL(fld_client_add_target);
 
-/* remove export from FLD */
+/* Remove export from FLD */
 int fld_client_del_target(struct lu_client_fld *fld,
                           __u64 idx)
 {
index 63b8d6b..9e2e253 100644 (file)
@@ -604,7 +604,7 @@ struct md_op_data {
         __u64                 mod_time;
         const char           *name;
         int                   namelen;
-        __u32                 create_mode;
+        __u32                 mode;
         struct lmv_stripe_md *mea1;
         struct lmv_stripe_md *mea2;
         __u32                 suppgids[2];
index a98b71c..3cb39d6 100644 (file)
@@ -1371,7 +1371,7 @@ target_send_reply_msg(struct ptlrpc_request *req, int rc, int fail_id)
                         DEBUG_REQ(D_ERROR, req, "processing error (%d)", rc);
                 req->rq_status = rc;
                 return (ptlrpc_error(req));
-        } 
+        }
 
         DEBUG_REQ(D_NET, req, "sending reply");
         return (ptlrpc_send_reply(req, 1));
index a8e2935..81ba673 100644 (file)
@@ -92,7 +92,7 @@ void llu_prep_md_op_data(struct md_op_data *op_data, struct inode *i1,
 
         op_data->name = name;
         op_data->namelen = namelen;
-        op_data->create_mode = mode;
+        op_data->mode = mode;
         op_data->mod_time = CURRENT_TIME;
 }
 
index 8c87b6a..5b470a2 100644 (file)
@@ -2245,7 +2245,7 @@ ll_prep_md_op_data(struct md_op_data *op_data, struct inode *i1,
 
         op_data->name = name;
         op_data->namelen = namelen;
-        op_data->create_mode = mode;
+        op_data->mode = mode;
         op_data->mod_time = CURRENT_SECONDS;
         op_data->fsuid = current->fsuid;
         op_data->fsgid = current->fsgid;
index ed8a84f..2c22726 100644 (file)
@@ -379,11 +379,12 @@ int lmv_connect_mdc(struct obd_device *obd, struct lmv_tgt_desc *tgt)
                 cluuid->uuid);
 
         if (!mdc_obd->obd_set_up) {
-                CERROR("target %s not set up\n", tgt->uuid.uuid);
+                CERROR("target %s is not set up\n", tgt->uuid.uuid);
                 RETURN(-EINVAL);
         }
 
-        rc = obd_connect(NULL, &conn, mdc_obd, &lmv_mdc_uuid, &lmv->conn_data);
+        rc = obd_connect(NULL, &conn, mdc_obd, &lmv_mdc_uuid,
+                         &lmv->conn_data);
         if (rc) {
                 CERROR("target %s connect error %d\n", tgt->uuid.uuid, rc);
                 RETURN(rc);
@@ -2052,7 +2053,7 @@ static int lmv_unlink_slaves(struct obd_export *exp,
         for (i = 0; i < mea->mea_count; i++) {
                 memset(op_data2, 0, sizeof(*op_data2));
                 op_data2->fid1 = mea->mea_ids[i];
-                op_data2->create_mode = MDS_MODE_DONT_LOCK | S_IFDIR;
+                op_data2->mode = MDS_MODE_DONT_LOCK | S_IFDIR;
                 op_data2->fsuid = current->fsuid;
                 op_data2->fsgid = current->fsgid;
                 tgt_exp = lmv_get_export(lmv, &op_data2->fid1);
index 1b43f1a..06e58f9 100644 (file)
@@ -298,7 +298,7 @@ void mdc_unlink_pack(struct ptlrpc_request *req, int offset,
         rec->ul_fsuid = op_data->fsuid;//current->fsuid;
         rec->ul_fsgid = op_data->fsgid;//current->fsgid;
         rec->ul_cap = op_data->cap;//current->cap_effective;
-        rec->ul_mode = op_data->create_mode;
+        rec->ul_mode = op_data->mode;
         rec->ul_suppgid = op_data->suppgids[0];
         rec->ul_fid1 = op_data->fid1;
         rec->ul_fid2 = op_data->fid2;
@@ -355,7 +355,7 @@ void mdc_rename_pack(struct ptlrpc_request *req, int offset,
         rec->rn_fid1 = op_data->fid1;
         rec->rn_fid2 = op_data->fid2;
         rec->rn_time = op_data->mod_time;
-        rec->rn_mode = op_data->create_mode;
+        rec->rn_mode = op_data->mode;
 
         mdc_pack_capa(req, offset + 1, op_data->mod_capa1);
         mdc_pack_capa(req, offset + 2, op_data->mod_capa2);
index 69db90e..f86f5f9 100644 (file)
@@ -54,7 +54,7 @@ static int mdc_reint(struct ptlrpc_request *request,
         else if (!lustre_swab_repbuf(request, REPLY_REC_OFF,
                                      sizeof(struct mdt_body),
                                      lustre_swab_mdt_body)) {
-                CERROR ("Can't unpack mds_body\n");
+                CERROR ("Can't unpack mdt_body\n");
                 rc = -EPROTO;
         }
         return rc;
index b756fc9..53ba9ef 100644 (file)
@@ -55,10 +55,14 @@ static void __mdd_ref_add(const struct lu_context *ctxt, struct mdd_object *obj,
                           struct thandle *handle);
 static void __mdd_ref_del(const struct lu_context *ctxt, struct mdd_object *obj,
                           struct thandle *handle);
-static int mdd_lookup_intent(const struct lu_context *ctxt,
-                             struct md_object *pobj,
-                             const char *name, const struct lu_fid* fid,
-                             int mask, struct md_ucred *uc);
+static int __mdd_lookup(const struct lu_context *ctxt,
+                        struct md_object *pobj,
+                        const char *name, const struct lu_fid* fid,
+                        int mask, struct md_ucred *uc);
+static int __mdd_lookup_locked(const struct lu_context *ctxt,
+                               struct md_object *pobj,
+                               const char *name, const struct lu_fid* fid,
+                               int mask, struct md_ucred *uc);
 static int mdd_exec_permission_lite(const struct lu_context *ctxt,
                                     struct mdd_object *obj,
                                     struct md_ucred *uc);
@@ -1707,7 +1711,8 @@ static int mdd_parent_fid(const struct lu_context *ctxt,
                           struct mdd_object *obj,
                           struct lu_fid *fid)
 {
-        return mdd_lookup_intent(ctxt, &obj->mod_obj, dotdot, fid, 0, NULL);
+        return __mdd_lookup_locked(ctxt, &obj->mod_obj,
+                                   dotdot, fid, 0, NULL);
 }
 
 /*
@@ -1959,10 +1964,10 @@ out:
         RETURN(rc);
 }
 
-static int mdd_lookup_intent(const struct lu_context *ctxt,
-                             struct md_object *pobj,
-                             const char *name, const struct lu_fid* fid,
-                             int mask, struct md_ucred *uc)
+static int
+__mdd_lookup(const struct lu_context *ctxt, struct md_object *pobj,
+             const char *name, const struct lu_fid* fid, int mask,
+             struct md_ucred *uc)
 {
         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
         struct dt_object    *dir = mdd_object_child(mdd_obj);
@@ -1974,31 +1979,43 @@ static int mdd_lookup_intent(const struct lu_context *ctxt,
         if (mdd_is_dead_obj(mdd_obj))
                 RETURN(-ESTALE);
 
-        mdd_read_lock(ctxt, mdd_obj);
         if (mask == MAY_EXEC)
                 rc = mdd_exec_permission_lite(ctxt, mdd_obj, uc);
         else
                 rc = mdd_permission_internal(ctxt, mdd_obj, mask, uc);
         if (rc)
-                GOTO(out_unlock, rc);
+                RETURN(rc);
 
         if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(ctxt, dir))
                 rc = dir->do_index_ops->dio_lookup(ctxt, dir, rec, key);
         else
                 rc = -ENOTDIR;
 
-out_unlock:
-        mdd_read_unlock(ctxt, mdd_obj);
         RETURN(rc);
 }
 
+static int
+__mdd_lookup_locked(const struct lu_context *ctxt, struct md_object *pobj,
+                    const char *name, const struct lu_fid* fid, int mask,
+                    struct md_ucred *uc)
+{
+        struct mdd_object *mdd_obj = md2mdd_obj(pobj);
+        int rc;
+        
+        mdd_read_lock(ctxt, mdd_obj);
+        rc = __mdd_lookup(ctxt, pobj, name, fid, mask, uc); 
+        mdd_read_unlock(ctxt, mdd_obj);
+
+        return rc;
+}
+
 static int mdd_lookup(const struct lu_context *ctxt,
                       struct md_object *pobj, const char *name,
                       struct lu_fid* fid, struct md_ucred *uc)
 {
         int rc;
         ENTRY;
-        rc = mdd_lookup_intent(ctxt, pobj, name, fid, MAY_EXEC, uc);
+        rc = __mdd_lookup_locked(ctxt, pobj, name, fid, MAY_EXEC, uc);
         RETURN(rc);
 }
 
@@ -2164,7 +2181,8 @@ static int mdd_create_sanity_check(const struct lu_context *ctxt,
         if (mdd_is_dead_obj(obj))
                 RETURN(-ENOENT);
 
-        rc = mdd_lookup_intent(ctxt, pobj, name, fid, MAY_WRITE | MAY_EXEC, uc);
+        rc = __mdd_lookup_locked(ctxt, pobj, name, fid,
+                                 MAY_WRITE | MAY_EXEC, uc);
         if (rc != -ENOENT)
                 RETURN(rc ? : -EEXIST);
 
@@ -2442,7 +2460,10 @@ static int mdd_object_create(const struct lu_context *ctxt,
         RETURN(rc);
 }
 
-/* partial operation */
+/*
+ * Partial operation. Be aware, this is called with write lock taken, so we use
+ * locksless version of __mdd_lookup() here.
+ */
 static int mdd_ni_sanity_check(const struct lu_context *ctxt,
                                struct md_object *pobj,
                                const char *name,
@@ -2457,7 +2478,7 @@ static int mdd_ni_sanity_check(const struct lu_context *ctxt,
         if (mdd_is_dead_obj(obj))
                 RETURN(-ENOENT);
 
-        rc = mdd_lookup_intent(ctxt, pobj, name, fid, MAY_WRITE | MAY_EXEC, uc);
+        rc = __mdd_lookup(ctxt, pobj, name, fid, MAY_WRITE | MAY_EXEC, uc);
         if (rc != -ENOENT)
                 RETURN(rc ? : -EEXIST);
         else
@@ -2493,6 +2514,10 @@ out_unlock:
         RETURN(rc);
 }
 
+/*
+ * Be aware, this is called with write lock taken, so we use locksless version
+ * of __mdd_lookup() here.
+ */
 static int mdd_nr_sanity_check(const struct lu_context *ctxt,
                                struct md_object *pobj,
                                const char *name,
@@ -2508,7 +2533,7 @@ static int mdd_nr_sanity_check(const struct lu_context *ctxt,
         if (mdd_is_dead_obj(obj))
                 RETURN(-ENOENT);
 
-        rc = mdd_lookup_intent(ctxt, pobj, name, fid, MAY_WRITE | MAY_EXEC, uc);
+        rc = __mdd_lookup(ctxt, pobj, name, fid, MAY_WRITE | MAY_EXEC, uc);
         RETURN(rc);
 }
 
index f774a4c..ca0a967 100644 (file)
@@ -1664,6 +1664,7 @@ static void mdt_thread_info_init(struct ptlrpc_request *req,
         info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
         info->mti_ctxt = req->rq_svc_thread->t_ctx;
         info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg);
+        
         /* it can be NULL while CONNECT */
         if (req->rq_export)
                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
index 610d9dd..f0aa06c 100644 (file)
@@ -603,9 +603,10 @@ static int mdt_open_by_fid(struct mdt_thread_info* info,
         if (rc > 0) {
                 const struct lu_context *ctxt = info->mti_ctxt;
 
-                mdt_set_disposition(info, rep, DISP_IT_EXECD);
-                mdt_set_disposition(info, rep, DISP_LOOKUP_EXECD);
-                mdt_set_disposition(info, rep, DISP_LOOKUP_POS);
+                mdt_set_disposition(info, rep, (DISP_IT_EXECD |
+                                                DISP_LOOKUP_EXECD |
+                                                DISP_LOOKUP_POS));
+
                 rc = mo_attr_get(ctxt, mdt_object_child(o), ma, NULL);
                 if (rc == 0)
                         rc = mdt_mfd_open(info, NULL, o, flags, 0, rep);
@@ -650,13 +651,13 @@ static int mdt_cross_open(struct mdt_thread_info* info,
                         rc = mdt_mfd_open(info, NULL, o, flags, 0, rep);
         } else if (rc == 0) {
                 /*
-                 * FIXME: something wrong here lookup was positive but there is
+                 * FIXME: something wrong here, lookup was positive but there is
                  * no object!
                  */
-                CERROR("Cross-ref object doesn't exists!\n");
+                CERROR("Cross-ref object doesn't exist!\n");
                 rc = -EFAULT;
         } else  {
-                /* FIXME: something wrong here the object is on another MDS! */
+                /* FIXME: something wrong here, the object is on another MDS! */
                 CERROR("The object isn't on this server! FLD error?\n");
                 rc = -EFAULT;
         }
@@ -705,10 +706,10 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
         }
 
         CDEBUG(D_INODE, "I am going to open "DFID"/("DFID":%s) "
-                        "cr_flag=0%o mode=0%06o msg_flag=0x%x\n",
-                        PFID(rr->rr_fid1), PFID(rr->rr_fid2),
-                        rr->rr_name, create_flags, la->la_mode,
-                        lustre_msg_get_flags(req->rq_reqmsg));
+               "cr_flag=0%o mode=0%06o msg_flag=0x%x\n",
+               PFID(rr->rr_fid1), PFID(rr->rr_fid2),
+               rr->rr_name, create_flags, la->la_mode,
+               lustre_msg_get_flags(req->rq_reqmsg));
 
         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
                 /* This is a replay request. */
@@ -722,18 +723,19 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
                  * via a regular replay.
                  */
                 if (!(create_flags & MDS_OPEN_CREAT)) {
-                        DEBUG_REQ(D_ERROR, req,"OPEN_CREAT not in open replay");
+                        DEBUG_REQ(D_ERROR, req,"OPEN & CREAT not in open replay.");
                         GOTO(out, result = -EFAULT);
                 }
-                CDEBUG(D_INFO, "Open replay failed to find object, "
-                       "continue as regular open\n");
+                CDEBUG(D_INFO, "Open replay did find object, continue as "
+                       "regular open\n");
         }
 
         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK))
                 GOTO(out, result = -ENOMEM);
 
-        mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
-        mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
+        mdt_set_disposition(info, ldlm_rep,
+                            (DISP_IT_EXECD | DISP_LOOKUP_EXECD));
+        
         if (rr->rr_name[0] == 0) {
                 /* this is cross-ref open */
                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
index bf5d9ba..4e37dea 100644 (file)
@@ -1823,7 +1823,6 @@ static int ptlrpc_replay_interpret(struct ptlrpc_request *req,
         RETURN(rc);
 }
 
-
 int ptlrpc_replay_req(struct ptlrpc_request *req)
 {
         struct ptlrpc_replay_async_args *aa;