Whamcloud - gitweb
LU-1187 mdt: unlink remote directory
authorwangdi <di.wang@whamcloud.com>
Tue, 19 Nov 2013 14:34:21 +0000 (06:34 -0800)
committerOleg Drokin <oleg.drokin@intel.com>
Sat, 2 Feb 2013 23:55:51 +0000 (18:55 -0500)
Send unlink req to the slave MDT, so both open/close
and unlink will send request to the slave MDT, where
the remote object resides. Then it would be able to
check orphan (unlink open file) locally.

Add lock_object api for enqueue remote object.

Change-Id: I7483b0f023e4e3de6597da58d3d9f3e96c0d53b7
Signed-off-by: Wang Di <di.wang@intel.com>
Reviewed-on: http://review.whamcloud.com/4339
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
25 files changed:
lustre/include/dt_object.h
lustre/include/lustre_dlm.h
lustre/include/lustre_update.h
lustre/include/md_object.h
lustre/ldlm/ldlm_request.c
lustre/llite/namei.c
lustre/lmv/lmv_obd.c
lustre/lod/lod_dev.c
lustre/lod/lod_object.c
lustre/mdd/mdd_dir.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_object.c
lustre/mdd/mdd_permission.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_lib.c
lustre/mdt/mdt_mds.c
lustre/mdt/mdt_reint.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osp/osp_dev.c
lustre/osp/osp_md_object.c
lustre/osp/osp_object.c
lustre/tests/replay-dual.sh
lustre/tests/replay-single.sh
lustre/tests/test-framework.sh

index 07f9a4b..8de5cc5 100644 (file)
@@ -67,6 +67,7 @@ struct dt_object;
 struct dt_index_features;
 struct niobuf_local;
 struct niobuf_remote;
+struct ldlm_enqueue_info;
 
 typedef enum {
         MNTOPT_USERXATTR        = 0x00000001,
@@ -524,6 +525,12 @@ struct dt_body_operations {
                           struct lustre_capa *capa);
 };
 
+struct dt_lock_operations {
+       int (*do_object_lock)(const struct lu_env *env, struct dt_object *dt,
+                             struct lustre_handle *lh,
+                             struct ldlm_enqueue_info *einfo,
+                             void *policy);
+};
 /**
  * Incomplete type of index record.
  */
@@ -667,6 +674,7 @@ struct dt_object {
         const struct dt_object_operations *do_ops;
         const struct dt_body_operations   *do_body_ops;
         const struct dt_index_operations  *do_index_ops;
+       const struct dt_lock_operations   *do_lock_ops;
 };
 
 /*
@@ -865,6 +873,17 @@ local_index_find_or_create_with_fid(const struct lu_env *env,
                                    const char *name, __u32 mode,
                                    const struct dt_index_features *ft);
 
+static inline int dt_object_lock(const struct lu_env *env,
+                                struct dt_object *o, struct lustre_handle *lh,
+                                struct ldlm_enqueue_info *einfo,
+                                void *policy)
+{
+       LASSERT(o);
+       LASSERT(o->do_lock_ops);
+       LASSERT(o->do_lock_ops->do_object_lock);
+       return o->do_lock_ops->do_object_lock(env, o, lh, einfo, policy);
+}
+
 int dt_lookup_dir(const struct lu_env *env, struct dt_object *dir,
                  const char *name, struct lu_fid *fid);
 
index 689ba01..45ea747 100644 (file)
@@ -1588,6 +1588,11 @@ int ldlm_prep_elc_req(struct obd_export *exp,
                       struct ptlrpc_request *req,
                       int version, int opc, int canceloff,
                       cfs_list_t *cancels, int count);
+
+struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp, int lvb_len);
+int ldlm_handle_enqueue0(struct ldlm_namespace *ns, struct ptlrpc_request *req,
+                        const struct ldlm_request *dlm_req,
+                        const struct ldlm_callback_suite *cbs);
 int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                           ldlm_type_t type, __u8 with_policy, ldlm_mode_t mode,
                          __u64 *flags, void *lvb, __u32 lvb_len,
index d85b8a1..d79cd70 100644 (file)
@@ -31,7 +31,7 @@
 #ifndef _LUSTRE_UPDATE_H
 #define _LUSTRE_UPDATE_H
 
-#define UPDATE_BUFFER_SIZE     4096
+#define UPDATE_BUFFER_SIZE     8192
 struct update_request {
        struct dt_device        *ur_dt;
        cfs_list_t              ur_list;    /* attached itself to thandle */
index 2048cc6..d1554ea 100644 (file)
@@ -200,12 +200,12 @@ struct md_op_spec {
         /** Create flag from client: such as MDS_OPEN_CREAT, and others. */
         __u64      sp_cr_flags;
 
-                    /** don't create lov objects or llog cookie - this replay */
+       /** don't create lov objects or llog cookie - this replay */
        unsigned int no_create:1,
-                    /** Should mdd do lookup sanity check or not. */
-                    sp_cr_lookup:1;
+                    sp_cr_lookup:1, /* do lookup sanity check or not. */
+                    sp_rm_entry:1;  /* only remove name entry */
 
-        /** Current lock mode for parent dir where create is performing. */
+       /** Current lock mode for parent dir where create is performing. */
         mdl_mode_t sp_cr_mode;
 
         /** to create directory */
@@ -285,6 +285,10 @@ struct md_object_operations {
         int (*moo_file_unlock)(const struct lu_env *env, struct md_object *obj,
                                struct lov_mds_md *lmm,
                                struct lustre_handle *lockh);
+       int (*moo_object_lock)(const struct lu_env *env, struct md_object *obj,
+                              struct lustre_handle *lh,
+                              struct ldlm_enqueue_info *einfo,
+                              void *policy);
 };
 
 /**
@@ -713,6 +717,16 @@ static inline int mo_file_unlock(const struct lu_env *env, struct md_object *m,
         return m->mo_ops->moo_file_unlock(env, m, lmm, lockh);
 }
 
+static inline int mo_object_lock(const struct lu_env *env,
+                                struct md_object *m,
+                                struct lustre_handle *lh,
+                                struct ldlm_enqueue_info *einfo,
+                                void *policy)
+{
+       LASSERT(m->mo_ops->moo_object_lock);
+       return m->mo_ops->moo_object_lock(env, m, lh, einfo, policy);
+}
+
 static inline int mdo_lookup(const struct lu_env *env,
                              struct md_object *p,
                              const struct lu_name *lname,
@@ -792,8 +806,8 @@ static inline int mdo_unlink(const struct lu_env *env,
                              const struct lu_name *lname,
                              struct md_attr *ma)
 {
-        LASSERT(c->mo_dir_ops->mdo_unlink);
-        return c->mo_dir_ops->mdo_unlink(env, p, c, lname, ma);
+       LASSERT(p->mo_dir_ops->mdo_unlink);
+       return p->mo_dir_ops->mdo_unlink(env, p, c, lname, ma);
 }
 
 static inline int mdo_lum_lmm_cmp(const struct lu_env *env,
@@ -903,5 +917,20 @@ int llo_local_objects_setup(const struct lu_env *env,
 int lustre_buf2som(void *buf, int rc, struct md_som_data *msd);
 int lustre_buf2hsm(void *buf, int rc, struct md_hsm *mh);
 void lustre_hsm2buf(void *buf, struct md_hsm *mh);
+
+#define md_cap_t(x) (x)
+
+#define MD_CAP_TO_MASK(x) (1 << (x))
+
+#define md_cap_raised(c, flag) (md_cap_t(c) & MD_CAP_TO_MASK(flag))
+
+/* capable() is copied from linux kernel! */
+static inline int md_capable(struct lu_ucred *uc, cfs_cap_t cap)
+{
+       if (md_cap_raised(uc->uc_cap, cap))
+               return 1;
+       return 0;
+}
+
 /** @} md */
 #endif /* _LINUX_MD_OBJECT_H */
index 74d775d..8e1dcef 100644 (file)
@@ -828,6 +828,28 @@ int ldlm_prep_enqueue_req(struct obd_export *exp, struct ptlrpc_request *req,
 }
 EXPORT_SYMBOL(ldlm_prep_enqueue_req);
 
+struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp, int lvb_len)
+{
+       struct ptlrpc_request *req;
+       int rc;
+       ENTRY;
+
+       req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE);
+       if (req == NULL)
+               RETURN(ERR_PTR(-ENOMEM));
+
+       rc = ldlm_prep_enqueue_req(exp, req, NULL, 0);
+       if (rc) {
+               ptlrpc_request_free(req);
+               RETURN(ERR_PTR(rc));
+       }
+
+       req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len);
+       ptlrpc_request_set_replen(req);
+       RETURN(req);
+}
+EXPORT_SYMBOL(ldlm_enqueue_pack);
+
 /**
  * Client-side lock enqueue.
  *
index f52a890..9073f85 100644 (file)
@@ -1127,6 +1127,7 @@ static int ll_rmdir_generic(struct inode *dir, struct dentry *dparent,
                 RETURN(PTR_ERR(op_data));
 
         ll_get_child_fid(dir, name, &op_data->op_fid3);
+       op_data->op_fid2 = op_data->op_fid3;
         rc = md_unlink(ll_i2sbi(dir)->ll_md_exp, op_data, &request);
         ll_finish_md_op_data(op_data);
         if (rc == 0) {
@@ -1270,11 +1271,12 @@ static int ll_unlink_generic(struct inode *dir, struct dentry *dparent,
         if (IS_ERR(op_data))
                 RETURN(PTR_ERR(op_data));
 
-        ll_get_child_fid(dir, name, &op_data->op_fid3);
-        rc = md_unlink(ll_i2sbi(dir)->ll_md_exp, op_data, &request);
-        ll_finish_md_op_data(op_data);
-        if (rc)
-                GOTO(out, rc);
+       ll_get_child_fid(dir, name, &op_data->op_fid3);
+       op_data->op_fid2 = op_data->op_fid3;
+       rc = md_unlink(ll_i2sbi(dir)->ll_md_exp, op_data, &request);
+       ll_finish_md_op_data(op_data);
+       if (rc)
+               GOTO(out, rc);
 
         ll_update_times(request, dir);
         ll_stats_ops_tally(ll_i2sbi(dir), LPROC_LL_UNLINK, 1);
index 763a6b1..fa3e183 100644 (file)
@@ -1988,14 +1988,19 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
        struct obd_device       *obd = exp->exp_obd;
        struct lmv_obd          *lmv = &obd->u.lmv;
        struct lmv_tgt_desc     *tgt = NULL;
+       struct mdt_body         *body;
        int                      rc;
        ENTRY;
 
        rc = lmv_check_connect(obd);
        if (rc)
                RETURN(rc);
-
-       tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
+retry:
+       /* Send unlink requests to the MDT where the child is located */
+       if (likely(!fid_is_zero(&op_data->op_fid2)))
+               tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid2);
+       else
+               tgt = lmv_locate_mds(lmv, op_data, &op_data->op_fid1);
        if (IS_ERR(tgt))
                RETURN(PTR_ERR(tgt));
 
@@ -2021,9 +2026,25 @@ static int lmv_unlink(struct obd_export *exp, struct md_op_data *op_data,
        if (rc != 0)
                RETURN(rc);
 
+       CDEBUG(D_INODE, "unlink with fid="DFID"/"DFID" -> mds #%d\n",
+              PFID(&op_data->op_fid1), PFID(&op_data->op_fid2), tgt->ltd_idx);
+
        rc = md_unlink(tgt->ltd_exp, op_data, request);
+       if (rc != 0 && rc != -EREMOTE)
+               RETURN(rc);
 
-       RETURN(rc);
+       body = req_capsule_server_get(&(*request)->rq_pill, &RMF_MDT_BODY);
+       if (body == NULL)
+               RETURN(-EPROTO);
+       /*
+        * Not cross-ref case, just get out of here.
+        */
+       if (likely(!(body->valid & OBD_MD_MDS)))
+               RETURN(0);
+
+       /* Clearly this is a remote object, try remote MDT */
+       op_data->op_fid2 = body->fid1;
+       goto retry;
 }
 
 static int lmv_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
index 5d8f5fe..13e24cf 100644 (file)
@@ -98,6 +98,7 @@ int lod_fld_lookup(const struct lu_env *env, struct lod_device *lod,
 extern struct lu_object_operations lod_lu_obj_ops;
 extern struct lu_object_operations lod_lu_robj_ops;
 extern struct dt_object_operations lod_obj_ops;
+extern struct dt_lock_operations   lod_lock_ops;
 
 /* Slab for OSD object allocation */
 cfs_mem_cache_t *lod_object_kmem;
@@ -141,6 +142,7 @@ struct lu_object *lod_object_alloc(const struct lu_env *env,
        lu_obj = lod2lu_obj(lod_obj);
        dt_object_init(&lod_obj->ldo_obj, NULL, dev);
        lod_obj->ldo_obj.do_ops = &lod_obj_ops;
+       lod_obj->ldo_obj.do_lock_ops = &lod_lock_ops;
        if (likely(mds == lu_site2seq(dev->ld_site)->ss_node_id))
                lu_obj->lo_ops = &lod_lu_obj_ops;
        else
index f082c91..2f288a4 100644 (file)
@@ -1131,6 +1131,26 @@ struct dt_object_operations lod_obj_ops = {
        .do_object_sync         = lod_object_sync,
 };
 
+static int lod_object_lock(const struct lu_env *env,
+                          struct dt_object *dt, struct lustre_handle *lh,
+                          struct ldlm_enqueue_info *einfo,
+                          void *policy)
+{
+       struct dt_object   *next = dt_object_child(dt);
+       int              rc;
+       ENTRY;
+
+       /*
+        * declare setattr on the local object
+        */
+       rc = dt_object_lock(env, next, lh, einfo, policy);
+
+       RETURN(rc);
+}
+
+struct dt_lock_operations lod_lock_ops = {
+       .do_object_lock       = lod_object_lock,
+};
 static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt,
                        struct lu_buf *buf, loff_t *pos,
                        struct lustre_capa *capa)
index 773940e..c5f65e8 100644 (file)
@@ -369,7 +369,34 @@ static inline int mdd_is_sticky(const struct lu_env *env,
        if (tmp_la->la_uid == uc->uc_fsuid)
                return 0;
 
-       return !mdd_capable(uc, CFS_CAP_FOWNER);
+       return !md_capable(uc, CFS_CAP_FOWNER);
+}
+
+static int mdd_may_delete_entry(const struct lu_env *env,
+                               struct mdd_object *pobj, int check_perm)
+{
+       ENTRY;
+
+       LASSERT(pobj != NULL);
+       if (!mdd_object_exists(pobj))
+               RETURN(-ENOENT);
+
+       if (mdd_is_dead_obj(pobj))
+               RETURN(-ENOENT);
+
+       if (check_perm) {
+               int rc;
+               rc = mdd_permission_internal_locked(env, pobj, NULL,
+                                           MAY_WRITE | MAY_EXEC,
+                                           MOR_TGT_PARENT);
+               if (rc)
+                       RETURN(rc);
+       }
+
+       if (mdd_is_append(pobj))
+               RETURN(-EPERM);
+
+       RETURN(0);
 }
 
 /*
@@ -383,31 +410,21 @@ int mdd_may_delete(const struct lu_env *env, struct mdd_object *pobj,
         int rc = 0;
         ENTRY;
 
-        LASSERT(cobj);
+       if (pobj) {
+               rc = mdd_may_delete_entry(env, pobj, check_perm);
+               if (rc != 0)
+                       RETURN(rc);
+       }
+
+       if (cobj == NULL)
+               RETURN(0);
+
         if (!mdd_object_exists(cobj))
                 RETURN(-ENOENT);
 
         if (mdd_is_dead_obj(cobj))
                 RETURN(-ESTALE);
 
-        if (pobj) {
-                if (!mdd_object_exists(pobj))
-                        RETURN(-ENOENT);
-
-                if (mdd_is_dead_obj(pobj))
-                        RETURN(-ENOENT);
-
-                if (check_perm) {
-                        rc = mdd_permission_internal_locked(env, pobj, NULL,
-                                                    MAY_WRITE | MAY_EXEC,
-                                                    MOR_TGT_PARENT);
-                        if (rc)
-                                RETURN(rc);
-                }
-
-                if (mdd_is_append(pobj))
-                        RETURN(-EPERM);
-        }
 
        if (mdd_is_sticky(env, pobj, cobj))
                 RETURN(-EPERM);
@@ -1095,6 +1112,7 @@ static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
                               const struct lu_name *name, struct md_attr *ma,
                               struct thandle *handle)
 {
+       struct lu_attr     *la = &mdd_env_info(env)->mti_la_for_fix;
         int rc;
 
         rc = mdo_declare_index_delete(env, p, name->ln_name, handle);
@@ -1105,31 +1123,37 @@ static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
         if (rc)
                 return rc;
 
-        rc = mdo_declare_ref_del(env, c, handle);
-        if (rc)
-                return rc;
+       LASSERT(ma->ma_attr.la_valid & LA_CTIME);
+       la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
+       la->la_valid = LA_CTIME | LA_MTIME;
+       rc = mdo_declare_attr_set(env, p, la, handle);
+       if (rc)
+               return rc;
 
-        rc = mdo_declare_ref_del(env, c, handle);
-        if (rc)
-                return rc;
+       if (c != NULL) {
+               rc = mdo_declare_ref_del(env, c, handle);
+               if (rc)
+                       return rc;
 
-        rc = mdo_declare_attr_set(env, p, NULL, handle);
-        if (rc)
-                return rc;
+               rc = mdo_declare_ref_del(env, c, handle);
+               if (rc)
+                       return rc;
 
-        rc = mdo_declare_attr_set(env, c, NULL, handle);
-        if (rc)
-                return rc;
+               rc = mdo_declare_attr_set(env, c, NULL, handle);
+               if (rc)
+                       return rc;
 
-        rc = mdd_declare_finish_unlink(env, c, ma, handle);
-        if (rc)
-                return rc;
+               rc = mdd_declare_finish_unlink(env, c, ma, handle);
+               if (rc)
+                       return rc;
 
-       rc = mdd_declare_links_del(env, c, handle);
-       if (rc != 0)
-               return rc;
+               rc = mdd_declare_links_del(env, c, handle);
+               if (rc != 0)
+                       return rc;
 
-       rc = mdd_declare_changelog_store(env, mdd, name, handle);
+               /* FIXME: need changelog for remove entry */
+               rc = mdd_declare_changelog_store(env, mdd, name, handle);
+       }
 
        return rc;
 }
@@ -1142,74 +1166,96 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
        struct lu_attr     *cattr = &mdd_env_info(env)->mti_cattr;
         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
-        struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
+       struct mdd_object *mdd_cobj = NULL;
         struct mdd_device *mdd = mdo2mdd(pobj);
         struct dynlock_handle *dlh;
         struct thandle    *handle;
-       int rc, is_dir;
+       int rc, is_dir = 0;
         ENTRY;
 
-        if (mdd_object_exists(mdd_cobj) <= 0)
-                RETURN(-ENOENT);
+       /* cobj == NULL means only delete name entry */
+       if (likely(cobj != NULL)) {
+               mdd_cobj = md2mdd_obj(cobj);
+               if (mdd_object_exists(mdd_cobj) == 0)
+                       RETURN(-ENOENT);
+               /* currently it is assume, it could only delete
+                * name entry of remote directory */
+               is_dir = 1;
+       }
 
-        handle = mdd_trans_create(env, mdd);
+       handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 RETURN(PTR_ERR(handle));
 
-        rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
-                                lname, ma, handle);
-        if (rc)
-                GOTO(stop, rc);
+       rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
+                               lname, ma, handle);
+       if (rc)
+               GOTO(stop, rc);
 
-        rc = mdd_trans_start(env, mdd, handle);
-        if (rc)
-                GOTO(stop, rc);
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc)
+               GOTO(stop, rc);
 
-        dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
-        if (dlh == NULL)
+       dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
+       if (dlh == NULL)
                GOTO(stop, rc = -ENOMEM);
-        mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
 
-       /* fetch cattr */
-       rc = mdd_la_get(env, mdd_cobj, cattr, mdd_object_capa(env, mdd_cobj));
-        if (rc)
-                GOTO(cleanup, rc);
+       if (likely(mdd_cobj != NULL)) {
+               mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
 
-       is_dir = S_ISDIR(cattr->la_mode);
+               /* fetch cattr */
+               rc = mdd_la_get(env, mdd_cobj, cattr,
+                               mdd_object_capa(env, mdd_cobj));
+               if (rc)
+                       GOTO(cleanup, rc);
+
+               is_dir = S_ISDIR(cattr->la_mode);
+
+       }
 
        rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, cattr);
-        if (rc)
-                GOTO(cleanup, rc);
+       if (rc)
+               GOTO(cleanup, rc);
 
        rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
                                mdd_object_capa(env, mdd_pobj));
        if (rc)
                GOTO(cleanup, rc);
 
-       rc = mdo_ref_del(env, mdd_cobj, handle);
-       if (rc != 0) {
-               __mdd_index_insert_only(env, mdd_pobj, mdo2fid(mdd_cobj),
-                                       name, handle,
-                                       mdd_object_capa(env, mdd_pobj));
-               GOTO(cleanup, rc);
+       if (likely(mdd_cobj != NULL)) {
+               rc = mdo_ref_del(env, mdd_cobj, handle);
+               if (rc != 0) {
+                       __mdd_index_insert_only(env, mdd_pobj,
+                                               mdo2fid(mdd_cobj),
+                                               name, handle,
+                                               mdd_object_capa(env, mdd_pobj));
+                       GOTO(cleanup, rc);
+               }
+
+               if (is_dir)
+                       /* unlink dot */
+                       mdo_ref_del(env, mdd_cobj, handle);
+
+               /* fetch updated nlink */
+               rc = mdd_la_get(env, mdd_cobj, cattr,
+                               mdd_object_capa(env, mdd_cobj));
+               if (rc)
+                       GOTO(cleanup, rc);
        }
 
-        if (is_dir)
-                /* unlink dot */
-                mdo_ref_del(env, mdd_cobj, handle);
+       LASSERT(ma->ma_attr.la_valid & LA_CTIME);
+       la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
 
-       /* fetch updated nlink */
-       rc = mdd_la_get(env, mdd_cobj, cattr, mdd_object_capa(env, mdd_cobj));
+       la->la_valid = LA_CTIME | LA_MTIME;
+       rc = mdd_attr_check_set_internal(env, mdd_pobj, la, handle, 0);
        if (rc)
                GOTO(cleanup, rc);
 
-        LASSERT(ma->ma_attr.la_valid & LA_CTIME);
-        la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
-
-        la->la_valid = LA_CTIME | LA_MTIME;
-       rc = mdd_attr_check_set_internal(env, mdd_pobj, la, handle, 0);
-        if (rc)
-                GOTO(cleanup, rc);
+       /* Enough for only unlink the entry */
+       if (unlikely(mdd_cobj == NULL)) {
+               mdd_pdo_write_unlock(env, mdd_pobj, dlh);
+               GOTO(stop, rc);
+       }
 
        if (cattr->la_nlink > 0 || mdd_cobj->mod_count > 0) {
                 /* update ctime of an unlinked file only if it is still
@@ -1651,7 +1697,6 @@ out:
         return rc;
 }
 
-
 /*
  * Create object and insert it into namespace.
  */
index f2d21a4..d5e4286 100644 (file)
@@ -433,21 +433,6 @@ struct lu_object *mdd_object_alloc(const struct lu_env *env,
                                    const struct lu_object_header *hdr,
                                    struct lu_device *d);
 
-/* mdd_permission.c */
-#define mdd_cap_t(x) (x)
-
-#define MDD_CAP_TO_MASK(x) (1 << (x))
-
-#define mdd_cap_raised(c, flag) (mdd_cap_t(c) & MDD_CAP_TO_MASK(flag))
-
-/* capable() is copied from linux kernel! */
-static inline int mdd_capable(struct lu_ucred *uc, cfs_cap_t cap)
-{
-       if (mdd_cap_raised(uc->uc_cap, cap))
-               return 1;
-       return 0;
-}
-
 int mdd_acl_chmod(const struct lu_env *env, struct mdd_object *o, __u32 mode,
                   struct thandle *handle);
 int __mdd_declare_acl_init(const struct lu_env *env, struct mdd_object *obj,
index 973f893..64ea295 100644 (file)
@@ -771,18 +771,18 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
                                 (LUSTRE_IMMUTABLE_FL | LUSTRE_APPEND_FL);
 
                if ((uc->uc_fsuid != tmp_la->la_uid) &&
-                   !mdd_capable(uc, CFS_CAP_FOWNER))
+                   !md_capable(uc, CFS_CAP_FOWNER))
                        RETURN(-EPERM);
 
-                /* XXX: the IMMUTABLE and APPEND_ONLY flags can
-                 * only be changed by the relevant capability. */
-                if (mdd_is_immutable(obj))
-                        oldflags |= LUSTRE_IMMUTABLE_FL;
-                if (mdd_is_append(obj))
-                        oldflags |= LUSTRE_APPEND_FL;
-                if ((oldflags ^ newflags) &&
-                    !mdd_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
-                        RETURN(-EPERM);
+               /* XXX: the IMMUTABLE and APPEND_ONLY flags can
+                * only be changed by the relevant capability. */
+               if (mdd_is_immutable(obj))
+                       oldflags |= LUSTRE_IMMUTABLE_FL;
+               if (mdd_is_append(obj))
+                       oldflags |= LUSTRE_APPEND_FL;
+               if ((oldflags ^ newflags) &&
+                   !md_capable(uc, CFS_CAP_LINUX_IMMUTABLE))
+                       RETURN(-EPERM);
 
                 if (!S_ISDIR(tmp_la->la_mode))
                         la->la_flags &= ~LUSTRE_DIRSYNC_FL;
@@ -797,7 +797,7 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
        if ((la->la_valid & (LA_MTIME | LA_ATIME | LA_CTIME)) &&
            !(la->la_valid & ~(LA_MTIME | LA_ATIME | LA_CTIME))) {
                if ((uc->uc_fsuid != tmp_la->la_uid) &&
-                   !mdd_capable(uc, CFS_CAP_FOWNER)) {
+                   !md_capable(uc, CFS_CAP_FOWNER)) {
                        rc = mdd_permission_internal(env, obj, tmp_la,
                                                     MAY_WRITE);
                        if (rc)
@@ -830,7 +830,7 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
         if (la->la_valid & LA_MODE) {
                if (!(flags & MDS_PERM_BYPASS) &&
                    (uc->uc_fsuid != tmp_la->la_uid) &&
-                   !mdd_capable(uc, CFS_CAP_FOWNER))
+                   !md_capable(uc, CFS_CAP_FOWNER))
                        RETURN(-EPERM);
 
                 if (la->la_mode == (cfs_umode_t) -1)
@@ -839,14 +839,14 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
                         la->la_mode = (la->la_mode & S_IALLUGO) |
                                       (tmp_la->la_mode & ~S_IALLUGO);
 
-                /* Also check the setgid bit! */
-                if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
-                                       la->la_gid : tmp_la->la_gid) &&
-                    !mdd_capable(uc, CFS_CAP_FSETID))
-                        la->la_mode &= ~S_ISGID;
-        } else {
-               la->la_mode = tmp_la->la_mode;
-        }
+               /* Also check the setgid bit! */
+               if (!lustre_in_group_p(uc, (la->la_valid & LA_GID) ?
+                                      la->la_gid : tmp_la->la_gid) &&
+                   !md_capable(uc, CFS_CAP_FSETID))
+                       la->la_mode &= ~S_ISGID;
+       } else {
+              la->la_mode = tmp_la->la_mode;
+       }
 
         /* Make sure a caller can chown. */
         if (la->la_valid & LA_UID) {
@@ -854,7 +854,7 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
                         la->la_uid = tmp_la->la_uid;
                if (((uc->uc_fsuid != tmp_la->la_uid) ||
                     (la->la_uid != tmp_la->la_uid)) &&
-                   !mdd_capable(uc, CFS_CAP_CHOWN))
+                   !md_capable(uc, CFS_CAP_CHOWN))
                        RETURN(-EPERM);
 
                 /* If the user or group of a non-directory has been
@@ -880,7 +880,7 @@ static int mdd_fix_attr(const struct lu_env *env, struct mdd_object *obj,
                if (((uc->uc_fsuid != tmp_la->la_uid) ||
                     ((la->la_gid != tmp_la->la_gid) &&
                      !lustre_in_group_p(uc, la->la_gid))) &&
-                   !mdd_capable(uc, CFS_CAP_CHOWN))
+                   !md_capable(uc, CFS_CAP_CHOWN))
                        RETURN(-EPERM);
 
                 /* Likewise, if the user or group of a non-directory
@@ -1176,7 +1176,7 @@ static int mdd_xattr_sanity_check(const struct lu_env *env,
                RETURN(rc);
 
        if ((uc->uc_fsuid != tmp_la->la_uid) &&
-           !mdd_capable(uc, CFS_CAP_FOWNER))
+           !md_capable(uc, CFS_CAP_FOWNER))
                RETURN(-EPERM);
 
        RETURN(rc);
@@ -1802,7 +1802,7 @@ static int mdd_open_sanity_check(const struct lu_env *env,
                if (uc && ((uc->uc_valid == UCRED_OLD) ||
                           (uc->uc_valid == UCRED_NEW)) &&
                    (uc->uc_fsuid != tmp_la->la_uid) &&
-                   !mdd_capable(uc, CFS_CAP_FOWNER))
+                   !md_capable(uc, CFS_CAP_FOWNER))
                        RETURN(-EPERM);
         }
 #endif
@@ -2167,6 +2167,18 @@ static int mdd_object_sync(const struct lu_env *env, struct md_object *obj)
         return dt_object_sync(env, mdd_object_child(mdd_obj));
 }
 
+static int mdd_object_lock(const struct lu_env *env,
+                          struct md_object *obj,
+                          struct lustre_handle *lh,
+                          struct ldlm_enqueue_info *einfo,
+                          void *policy)
+{
+       struct mdd_object *mdd_obj = md2mdd_obj(obj);
+       LASSERT(mdd_object_exists(mdd_obj));
+       return dt_object_lock(env, mdd_object_child(mdd_obj), lh,
+                             einfo, policy);
+}
+
 const struct md_object_operations mdd_obj_ops = {
        .moo_permission         = mdd_permission,
        .moo_attr_get           = mdd_attr_get,
@@ -2184,4 +2196,5 @@ const struct md_object_operations mdd_obj_ops = {
        .moo_capa_get           = mdd_capa_get,
        .moo_object_sync        = mdd_object_sync,
        .moo_path               = mdd_path,
+       .moo_object_lock        = mdd_object_lock,
 };
index 1debe32..86e779e 100644 (file)
@@ -318,17 +318,17 @@ int __mdd_permission_internal(const struct lu_env *env, struct mdd_object *obj,
                 RETURN(0);
 
 check_capabilities:
-        if (!(mask & MAY_EXEC) ||
-            (la->la_mode & S_IXUGO) || S_ISDIR(la->la_mode))
-                if (mdd_capable(uc, CFS_CAP_DAC_OVERRIDE))
-                        RETURN(0);
+       if (!(mask & MAY_EXEC) ||
+           (la->la_mode & S_IXUGO) || S_ISDIR(la->la_mode))
+               if (md_capable(uc, CFS_CAP_DAC_OVERRIDE))
+                       RETURN(0);
 
-        if ((mask == MAY_READ) ||
-            (S_ISDIR(la->la_mode) && !(mask & MAY_WRITE)))
-                if (mdd_capable(uc, CFS_CAP_DAC_READ_SEARCH))
-                        RETURN(0);
+       if ((mask == MAY_READ) ||
+           (S_ISDIR(la->la_mode) && !(mask & MAY_WRITE)))
+               if (md_capable(uc, CFS_CAP_DAC_READ_SEARCH))
+                       RETURN(0);
 
-        RETURN(-EACCES);
+       RETURN(-EACCES);
 }
 
 int mdd_permission(const struct lu_env *env,
@@ -415,7 +415,7 @@ int mdd_permission(const struct lu_env *env,
                        uc = lu_ucred_assert(env);
 
                if (la->la_uid != uc->uc_fsuid &&
-                   !mdd_capable(uc, CFS_CAP_FOWNER))
+                   !md_capable(uc, CFS_CAP_FOWNER))
                        rc = -EPERM;
         }
 
index bdb13e1..15d653a 100644 (file)
@@ -91,7 +91,6 @@ ldlm_mode_t mdt_dlm_lock_modes[] = {
         [MDL_GROUP]   = LCK_GROUP
 };
 
-
 static struct mdt_device *mdt_dev(struct lu_device *d);
 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags);
 static int mdt_fid2path(const struct lu_env *env, struct mdt_device *mdt,
@@ -142,6 +141,7 @@ void mdt_lock_reg_init(struct mdt_lock_handle *lh, ldlm_mode_t lm)
 {
         lh->mlh_pdo_hash = 0;
         lh->mlh_reg_mode = lm;
+       lh->mlh_rreg_mode = lm;
         lh->mlh_type = MDT_REG_LOCK;
 }
 
@@ -149,6 +149,7 @@ void mdt_lock_pdo_init(struct mdt_lock_handle *lh, ldlm_mode_t lm,
                        const char *name, int namelen)
 {
         lh->mlh_reg_mode = lm;
+       lh->mlh_rreg_mode = lm;
         lh->mlh_type = MDT_PDO_LOCK;
 
         if (name != NULL && (name[0] != '\0')) {
@@ -1254,7 +1255,8 @@ relock:
                         GOTO(out_child, rc = -ENOENT);
                 }
 
-                if (!(child_bits & MDS_INODELOCK_UPDATE)) {
+               if (!(child_bits & MDS_INODELOCK_UPDATE) &&
+                     mdt_object_exists(child) > 0) {
                         struct md_attr *ma = &info->mti_attr;
 
                         ma->ma_valid = 0;
@@ -1331,7 +1333,8 @@ relock:
                          (unsigned long)res_id->name[1],
                          (unsigned long)res_id->name[2],
                          PFID(mdt_object_fid(child)));
-                mdt_pack_size2body(info, child);
+               if (mdt_object_exists(child) > 0)
+                       mdt_pack_size2body(info, child);
         }
         if (lock)
                 LDLM_LOCK_PUT(lock);
@@ -1697,18 +1700,19 @@ static long mdt_reint_opcode(struct mdt_thread_info *info,
 
 int mdt_reint(struct mdt_thread_info *info)
 {
-        long opc;
-        int  rc;
-
-        static const struct req_format *reint_fmts[REINT_MAX] = {
-                [REINT_SETATTR]  = &RQF_MDS_REINT_SETATTR,
-                [REINT_CREATE]   = &RQF_MDS_REINT_CREATE,
-                [REINT_LINK]     = &RQF_MDS_REINT_LINK,
-                [REINT_UNLINK]   = &RQF_MDS_REINT_UNLINK,
-                [REINT_RENAME]   = &RQF_MDS_REINT_RENAME,
-                [REINT_OPEN]     = &RQF_MDS_REINT_OPEN,
-                [REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR
-        };
+       long opc;
+       int  rc;
+
+       static const struct req_format *reint_fmts[REINT_MAX] = {
+               [REINT_SETATTR]  = &RQF_MDS_REINT_SETATTR,
+               [REINT_CREATE]   = &RQF_MDS_REINT_CREATE,
+               [REINT_LINK]     = &RQF_MDS_REINT_LINK,
+               [REINT_UNLINK]   = &RQF_MDS_REINT_UNLINK,
+               [REINT_RENAME]   = &RQF_MDS_REINT_RENAME,
+               [REINT_OPEN]     = &RQF_MDS_REINT_OPEN,
+               [REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR,
+               [REINT_RMENTRY] = &RQF_MDS_REINT_UNLINK
+       };
 
         ENTRY;
 
@@ -2378,6 +2382,57 @@ int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
         RETURN(rc);
 }
 
+int mdt_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+                       void *data, int flag)
+{
+       struct lustre_handle lockh;
+       int               rc;
+
+       switch (flag) {
+       case LDLM_CB_BLOCKING:
+               ldlm_lock2handle(lock, &lockh);
+               rc = ldlm_cli_cancel(&lockh);
+               if (rc < 0) {
+                       CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
+                       RETURN(rc);
+               }
+               break;
+       case LDLM_CB_CANCELING:
+               LDLM_DEBUG(lock, "Revoke remote lock\n");
+               break;
+       default:
+               LBUG();
+       }
+       RETURN(0);
+}
+
+int mdt_remote_object_lock(struct mdt_thread_info *mti,
+                          struct mdt_object *o, struct lustre_handle *lh,
+                          ldlm_mode_t mode, __u64 ibits)
+{
+       struct ldlm_enqueue_info *einfo = &mti->mti_einfo;
+       ldlm_policy_data_t *policy = &mti->mti_policy;
+       int rc = 0;
+       ENTRY;
+
+       LASSERT(mdt_object_exists(o) < 0);
+
+       LASSERT((ibits & MDS_INODELOCK_UPDATE));
+
+       memset(einfo, 0, sizeof(*einfo));
+       einfo->ei_type = LDLM_IBITS;
+       einfo->ei_mode = mode;
+       einfo->ei_cb_bl = mdt_md_blocking_ast;
+       einfo->ei_cb_cp = ldlm_completion_ast;
+
+       memset(policy, 0, sizeof(*policy));
+       policy->l_inodebits.bits = ibits;
+
+       rc = mo_object_lock(mti->mti_env, mdt_object_child(o), lh, einfo,
+                           policy);
+       RETURN(rc);
+}
+
 static int mdt_object_lock0(struct mdt_thread_info *info, struct mdt_object *o,
                            struct mdt_lock_handle *lh, __u64 ibits,
                            bool nonblock, int locality)
@@ -2396,7 +2451,6 @@ static int mdt_object_lock0(struct mdt_thread_info *info, struct mdt_object *o,
 
         if (mdt_object_exists(o) < 0) {
                 if (locality == MDT_CROSS_LOCK) {
-                        /* cross-ref object fix */
                         ibits &= ~MDS_INODELOCK_UPDATE;
                         ibits |= MDS_INODELOCK_LOOKUP;
                 } else {
@@ -2568,6 +2622,9 @@ void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
         mdt_save_lock(info, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref);
         mdt_save_lock(info, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref);
 
+       if (lustre_handle_is_used(&lh->mlh_rreg_lh))
+               ldlm_lock_decref(&lh->mlh_rreg_lh, lh->mlh_rreg_mode);
+
         EXIT;
 }
 
@@ -2882,6 +2939,8 @@ void mdt_lock_handle_init(struct mdt_lock_handle *lh)
         lh->mlh_reg_mode = LCK_MINMODE;
         lh->mlh_pdo_lh.cookie = 0ull;
         lh->mlh_pdo_mode = LCK_MINMODE;
+       lh->mlh_rreg_lh.cookie = 0ull;
+       lh->mlh_rreg_mode = LCK_MINMODE;
 }
 
 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
index c70d8ae..339fc1a 100644 (file)
@@ -238,6 +238,10 @@ struct mdt_lock_handle {
         struct lustre_handle    mlh_pdo_lh;
         ldlm_mode_t             mlh_pdo_mode;
         unsigned int            mlh_pdo_hash;
+
+       /* Remote regular lock */
+       struct lustre_handle    mlh_rreg_lh;
+       ldlm_mode_t          mlh_rreg_mode;
 };
 
 enum {
@@ -314,6 +318,9 @@ struct tx_arg {
                        struct lu_buf   buf;
                        loff_t          pos;
                } write;
+               struct {
+                       struct ost_body     *body;
+               } destroy;
        } u;
 };
 
@@ -657,6 +664,9 @@ void mdt_object_unlock_put(struct mdt_thread_info *,
 
 void mdt_client_compatibility(struct mdt_thread_info *info);
 
+int mdt_remote_object_lock(struct mdt_thread_info *mti,
+                          struct mdt_object *o, struct lustre_handle *lh,
+                          ldlm_mode_t mode, __u64 ibits);
 int mdt_close_unpack(struct mdt_thread_info *info);
 int mdt_reint_unpack(struct mdt_thread_info *info, __u32 op);
 int mdt_reint_rec(struct mdt_thread_info *, struct mdt_lock_handle *);
index e662147..42b894a 100644 (file)
@@ -1081,12 +1081,18 @@ static int mdt_unlink_unpack(struct mdt_thread_info *info)
         else
                 ma->ma_attr_flags &= ~MDS_VTX_BYPASS;
 
-        info->mti_spec.no_create = !!req_is_replay(mdt_info_req(info));
+       info->mti_spec.no_create = !!req_is_replay(mdt_info_req(info));
 
         rc = mdt_dlmreq_unpack(info);
         RETURN(rc);
 }
 
+static int mdt_rmentry_unpack(struct mdt_thread_info *info)
+{
+       info->mti_spec.sp_rm_entry = 1;
+       return mdt_unlink_unpack(info);
+}
+
 static int mdt_rename_unpack(struct mdt_thread_info *info)
 {
        struct lu_ucred         *uc = mdt_ucred(info);
@@ -1329,13 +1335,14 @@ static int mdt_setxattr_unpack(struct mdt_thread_info *info)
 typedef int (*reint_unpacker)(struct mdt_thread_info *info);
 
 static reint_unpacker mdt_reint_unpackers[REINT_MAX] = {
-        [REINT_SETATTR]  = mdt_setattr_unpack,
-        [REINT_CREATE]   = mdt_create_unpack,
-        [REINT_LINK]     = mdt_link_unpack,
-        [REINT_UNLINK]   = mdt_unlink_unpack,
-        [REINT_RENAME]   = mdt_rename_unpack,
-        [REINT_OPEN]     = mdt_open_unpack,
-        [REINT_SETXATTR] = mdt_setxattr_unpack
+       [REINT_SETATTR]  = mdt_setattr_unpack,
+       [REINT_CREATE]   = mdt_create_unpack,
+       [REINT_LINK]     = mdt_link_unpack,
+       [REINT_UNLINK]   = mdt_unlink_unpack,
+       [REINT_RENAME]   = mdt_rename_unpack,
+       [REINT_OPEN]     = mdt_open_unpack,
+       [REINT_SETXATTR] = mdt_setxattr_unpack,
+       [REINT_RMENTRY]  = mdt_rmentry_unpack,
 };
 
 int mdt_reint_unpack(struct mdt_thread_info *info, __u32 op)
index d5d4dde..fb37a2c 100644 (file)
@@ -51,9 +51,7 @@
 #include <lustre_mds.h>
 #include <lustre_mdt.h>
 #include "mdt_internal.h"
-#ifdef HAVE_QUOTA_SUPPORT
-# include <lustre_quota.h>
-#endif
+#include <lustre_quota.h>
 #include <lustre_acl.h>
 #include <lustre_param.h>
 #include <lustre_fsfilt.h>
index b906337..171ee3e 100644 (file)
@@ -91,7 +91,6 @@ static void mdt_obj_version_get(struct mdt_thread_info *info,
                                 struct mdt_object *o, __u64 *version)
 {
         LASSERT(o);
-        LASSERT(mdt_object_exists(o) >= 0);
        if (mdt_object_exists(o) > 0 && !mdt_object_obf(o))
                 *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
         else
@@ -308,6 +307,14 @@ static int mdt_md_create(struct mdt_thread_info *info)
 
                if (mdt_object_exists(child) < 0) {
                        struct seq_server_site *ss;
+                       struct lu_ucred *uc  = mdt_ucred(info);
+
+                       if (!md_capable(uc, CFS_CAP_SYS_ADMIN)) {
+                               CERROR("%s: Creating remote dir is only "
+                                      "permitted for administrator: rc = %d\n",
+                                       mdt2obd_dev(mdt)->obd_name, -EPERM);
+                               GOTO(out_put_child, rc = -EPERM);
+                       }
 
                        ss = mdt_seq_site(mdt);
                        if (ss->ss_node_id != 0 &&
@@ -671,46 +678,115 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
                 RETURN(err_serious(-ENOENT));
 
         /*
-        * step 1: lock the parent.
+        * step 1: Found the parent.
          */
-        parent_lh = &info->mti_lh[MDT_LH_PARENT];
-       mdt_lock_pdo_init(parent_lh, LCK_PW, rr->rr_name,
-                         rr->rr_namelen);
+       mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
+       if (IS_ERR(mp)) {
+               rc = PTR_ERR(mp);
+               GOTO(out, rc);
+       }
 
-        mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh,
-                                  MDS_INODELOCK_UPDATE);
-       if (IS_ERR(mp))
-               GOTO(out, rc = PTR_ERR(mp));
+       if (mdt_object_obf(mp))
+               GOTO(put_parent, rc = -EPERM);
+
+       parent_lh = &info->mti_lh[MDT_LH_PARENT];
+       lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
+       if (mdt_object_exists(mp) < 0) {
+               mdt_lock_reg_init(parent_lh, LCK_EX);
+               rc = mdt_remote_object_lock(info, mp, &parent_lh->mlh_rreg_lh,
+                                           parent_lh->mlh_rreg_mode,
+                                           MDS_INODELOCK_UPDATE);
+               if (rc != ELDLM_OK)
+                       GOTO(put_parent, rc);
+
+       } else {
+               mdt_lock_pdo_init(parent_lh, LCK_PW, rr->rr_name,
+                                 rr->rr_namelen);
+               rc = mdt_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE,
+                                    MDT_LOCAL_LOCK);
+               if (rc)
+                       GOTO(put_parent, rc);
 
-        if (mdt_object_obf(mp))
-                GOTO(out_unlock_parent, rc = -EPERM);
+               rc = mdt_version_get_check_save(info, mp, 0);
+               if (rc)
+                       GOTO(unlock_parent, rc);
+       }
 
-        rc = mdt_version_get_check_save(info, mp, 0);
-        if (rc)
-                GOTO(out_unlock_parent, rc);
+       /* step 2: find & lock the child */
+       /* lookup child object along with version checking */
+       fid_zero(child_fid);
+       rc = mdt_lookup_version_check(info, mp, lname, child_fid, 1);
+       if (rc != 0)
+               GOTO(unlock_parent, rc);
 
         mdt_reint_init_ma(info, ma);
 
-        /* step 2: find & lock the child */
-        lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
-        /* lookup child object along with version checking */
-        fid_zero(child_fid);
-        rc = mdt_lookup_version_check(info, mp, lname, child_fid, 1);
-        if (rc != 0)
-                 GOTO(out_unlock_parent, rc);
+       /* We will lock the child regardless it is local or remote. No harm. */
+       mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
+       if (IS_ERR(mc))
+               GOTO(unlock_parent, rc = PTR_ERR(mc));
 
-        /* We will lock the child regardless it is local or remote. No harm. */
-        mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
-        if (IS_ERR(mc))
-                GOTO(out_unlock_parent, rc = PTR_ERR(mc));
         child_lh = &info->mti_lh[MDT_LH_CHILD];
         mdt_lock_reg_init(child_lh, LCK_EX);
+       if (mdt_object_exists(mc) < 0) {
+               struct mdt_body  *repbody;
+
+               if (!fid_is_zero(rr->rr_fid2)) {
+                       CDEBUG(D_INFO, "%s: name %s can not find "DFID"\n",
+                              mdt2obd_dev(info->mti_mdt)->obd_name,
+                              (char *)rr->rr_name, PFID(mdt_object_fid(mc)));
+                       GOTO(unlock_parent, rc = -ENOENT);
+               }
+               CDEBUG(D_INFO, "%s: name %s: "DFID" is another MDT\n",
+                      mdt2obd_dev(info->mti_mdt)->obd_name,
+                      (char *)rr->rr_name, PFID(mdt_object_fid(mc)));
+
+               if (info->mti_spec.sp_rm_entry) {
+                       struct lu_ucred *uc  = mdt_ucred(info);
+
+                       if (!md_capable(uc, CFS_CAP_SYS_ADMIN)) {
+                               CERROR("%s: unlink remote entry is only "
+                                      "permitted for administrator: rc = %d\n",
+                                       mdt2obd_dev(info->mti_mdt)->obd_name,
+                                       -EPERM);
+                               GOTO(unlock_parent, rc = -EPERM);
+                       }
+
+                       ma->ma_need = MA_INODE;
+                       ma->ma_valid = 0;
+                       mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
+                       rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
+                                       NULL, lname, ma);
+                       mdt_object_put(info->mti_env, mc);
+                       GOTO(unlock_parent, rc);
+               }
+               /* Revoke the LOOKUP lock of the remote object granted by
+                * this MDT. Since the unlink will happen on another MDT,
+                * it will release the LOOKUP lock right away. Then What
+                * would happen if another client try to grab the LOOKUP
+                * lock at the same time with unlink XXX */
+               mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_LOOKUP,
+                               MDT_CROSS_LOCK);
+               repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
+               LASSERT(repbody != NULL);
+               repbody->fid1 = *mdt_object_fid(mc);
+               repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS);
+               mdt_object_unlock_put(info, mc, child_lh, rc);
+               GOTO(unlock_parent, rc = -EREMOTE);
+       } else if (info->mti_spec.sp_rm_entry) {
+               CERROR("%s: lfs rmdir should not be used on local dir %s\n",
+                      mdt2obd_dev(info->mti_mdt)->obd_name,
+                      (char *)rr->rr_name);
+               mdt_object_put(info->mti_env, mc);
+               GOTO(unlock_parent, rc = -EPERM);
+       }
+
         rc = mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_FULL,
                              MDT_CROSS_LOCK);
-        if (rc != 0) {
-                mdt_object_put(info->mti_env, mc);
-                GOTO(out_unlock_parent, rc);
-        }
+       if (rc != 0) {
+               mdt_object_put(info->mti_env, mc);
+               GOTO(unlock_parent, rc);
+       }
 
         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
@@ -752,8 +828,10 @@ static int mdt_reint_unlink(struct mdt_thread_info *info,
         EXIT;
 
         mdt_object_unlock_put(info, mc, child_lh, rc);
-out_unlock_parent:
-        mdt_object_unlock_put(info, mp, parent_lh, rc);
+unlock_parent:
+       mdt_object_unlock(info, mp, parent_lh, rc);
+put_parent:
+       mdt_object_put(info->mti_env, mp);
 out:
         return rc;
 }
@@ -1189,13 +1267,14 @@ typedef int (*mdt_reinter)(struct mdt_thread_info *info,
                            struct mdt_lock_handle *lhc);
 
 static mdt_reinter reinters[REINT_MAX] = {
-        [REINT_SETATTR]  = mdt_reint_setattr,
-        [REINT_CREATE]   = mdt_reint_create,
-        [REINT_LINK]     = mdt_reint_link,
-        [REINT_UNLINK]   = mdt_reint_unlink,
-        [REINT_RENAME]   = mdt_reint_rename,
-        [REINT_OPEN]     = mdt_reint_open,
-        [REINT_SETXATTR] = mdt_reint_setxattr
+       [REINT_SETATTR]  = mdt_reint_setattr,
+       [REINT_CREATE]   = mdt_reint_create,
+       [REINT_LINK]     = mdt_reint_link,
+       [REINT_UNLINK]   = mdt_reint_unlink,
+       [REINT_RENAME]   = mdt_reint_rename,
+       [REINT_OPEN]     = mdt_reint_open,
+       [REINT_SETXATTR] = mdt_reint_setxattr,
+       [REINT_RMENTRY]  = mdt_reint_unlink
 };
 
 int mdt_reint_rec(struct mdt_thread_info *info,
index 8ce07f5..7d4d486 100644 (file)
@@ -2309,15 +2309,9 @@ static struct inode *osd_create_remote_inode(const struct lu_env *env,
        oh = container_of(th, struct osd_thandle, ot_super);
        LASSERT(oh->ot_handle->h_transaction != NULL);
 
-#ifdef HAVE_QUOTA_SUPPORT
-       osd_push_ctxt(info->oti_env, save);
-#endif
        /* FIXME: Insert index api needs to know the mode of
         * the remote object. Just use S_IFDIR for now */
        local = ldiskfs_create_inode(oh->ot_handle, pobj->oo_inode, S_IFDIR);
-#ifdef HAVE_QUOTA_SUPPORT
-       osd_pop_ctxt(info->oti_env, save);
-#endif
        if (IS_ERR(local)) {
                CERROR("%s: create local error %d\n", osd_name(osd),
                       (int)PTR_ERR(local));
@@ -3793,11 +3787,8 @@ static int osd_index_declare_ea_insert(const struct lu_env *env,
        if (rc <= 0)
                RETURN(rc);
 
-#ifdef OSD_DECLARE_OP
-#define OSD_OT_CREATE create
-#define OSD_OT_INSERT insert
-#define osd_trans_declare_op(env, oh, op, cred) OSD_DECLARE_OP(oh, op, cred)
-#endif
+       rc = 0;
+
        osd_trans_declare_op(env, oh, OSD_OT_CREATE,
                             osd_dto_credits_noquota[DTO_OBJECT_CREATE]);
        osd_trans_declare_op(env, oh, OSD_OT_INSERT,
index 9a66737..a35b392 100644 (file)
@@ -729,11 +729,14 @@ out_precreat:
        if (!m->opd_connect_mdt)
                osp_precreate_fini(m);
 out_last_used:
-       osp_last_used_fini(env, m);
+       if (!m->opd_connect_mdt)
+               osp_last_used_fini(env, m);
 out_proc:
        ptlrpc_lprocfs_unregister_obd(obd);
        lprocfs_obd_cleanup(obd);
        obd_cleanup_client_import(obd);
+       if (m->opd_symlink)
+               lprocfs_remove(&m->opd_symlink);
        client_obd_cleanup(obd);
 out_ref:
        ptlrpcd_decref();
index 9df10ed..f4493d0 100644 (file)
@@ -128,7 +128,7 @@ static struct update_request
        if (!update)
                return ERR_PTR(-ENOMEM);
 
-       OBD_ALLOC(update->ur_buf, UPDATE_BUFFER_SIZE);
+       OBD_ALLOC_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE);
        if (update->ur_buf == NULL) {
                OBD_FREE_PTR(update);
                return ERR_PTR(-ENOMEM);
@@ -150,7 +150,7 @@ static void osp_destroy_update_req(struct update_request *update)
 
        cfs_list_del(&update->ur_list);
        if (update->ur_buf != NULL)
-               OBD_FREE(update->ur_buf, UPDATE_BUFFER_SIZE);
+               OBD_FREE_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE);
 
        OBD_FREE_PTR(update);
        return;
@@ -1069,3 +1069,45 @@ struct dt_object_operations osp_md_obj_ops = {
        .do_index_try         = osp_md_index_try,
 };
 
+static int osp_md_object_lock(const struct lu_env *env,
+                             struct dt_object *dt,
+                             struct lustre_handle *lh,
+                             struct ldlm_enqueue_info *einfo,
+                             void *policy)
+{
+       struct osp_thread_info *info = osp_env_info(env);
+       struct ldlm_res_id     *res_id = &info->osi_resid;
+       struct dt_device       *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
+       struct osp_device      *osp = dt2osp_dev(dt_dev);
+       struct ptlrpc_request  *req = NULL;
+       int                     rc = 0;
+       __u64                   flags = 0;
+       ldlm_mode_t             mode;
+
+       fid_build_reg_res_name(lu_object_fid(&dt->do_lu), res_id);
+
+       mode = ldlm_lock_match(osp->opd_obd->obd_namespace,
+                              LDLM_FL_BLOCK_GRANTED, res_id,
+                              einfo->ei_type,
+                              (ldlm_policy_data_t *)policy,
+                              einfo->ei_mode, lh, 0);
+       if (mode > 0)
+               return ELDLM_OK;
+
+       req = ldlm_enqueue_pack(osp->opd_exp, 0);
+       if (IS_ERR(req))
+               RETURN(PTR_ERR(req));
+
+       rc = ldlm_cli_enqueue(osp->opd_exp, &req, einfo, res_id,
+                             (const ldlm_policy_data_t *)policy,
+                             &flags, NULL, 0, LVB_T_NONE, lh, 0);
+
+       ptlrpc_req_finished(req);
+
+       return rc == ELDLM_OK ? 0 : -EIO;
+}
+
+struct dt_lock_operations osp_md_lock_ops = {
+       .do_object_lock       = osp_md_object_lock,
+};
+
index 695a31e..2426be1 100644 (file)
@@ -369,6 +369,7 @@ static int osp_object_init(const struct lu_env *env, struct lu_object *o,
 
                po->opo_obj.do_ops = &osp_md_obj_ops;
                o->lo_header->loh_attr |= LOHA_REMOTE;
+               po->opo_obj.do_lock_ops = &osp_md_lock_ops;
                /* Do not need get attr for new object */
                if (!(conf != NULL && (conf->loc_flags & LOC_F_NEW) != 0)) {
                        rc = po->opo_obj.do_ops->do_attr_get(env, lu2dt_obj(o),
index e859a21..c765d2e 100755 (executable)
@@ -674,8 +674,8 @@ test_22c () {
 
        do_node $CLIENT1 mkdir -p $MOUNT1/${tdir}
 
-       # OBD_FAIL_MDS_DROP_OBJ_UPDATE       0x188
-       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x188
+       # OBD_FAIL_UPDATE_OBJ_NET    0x1500
+       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1500
        do_node $CLIENT1 $LFS mkdir -i $MDTIDX $MOUNT1/$remote_dir &
        CLIENT_PID=$!
        do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0
@@ -701,8 +701,8 @@ test_22d () {
 
        do_node $CLIENT1 mkdir -p $MOUNT1/${tdir}
 
-       # OBD_FAIL_MDS_DROP_OBJ_UPDATE       0x188
-       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x188
+       # OBD_FAIL_UPDATE_OBJ_NET    0x1500
+       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1500
        do_node $CLIENT1 $LFS mkdir -i $MDTIDX $MOUNT1/$remote_dir &
        CLIENT_PID=$!
        do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0
@@ -808,8 +808,8 @@ test_23c () {
        do_node $CLIENT1 $LFS mkdir -i $MDTIDX $MOUNT1/$remote_dir ||
                        error "lfs mkdir failed"
 
-       # OBD_FAIL_MDS_DROP_OBJ_UPDATE       0x188
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0x188
+       # OBD_FAIL_UPDATE_OBJ_NET    0x1500
+       do_facet mds${MDTIDX} lctl set_param fail_loc=0x1500
        do_node $CLIENT1 rmdir $MOUNT1/$remote_dir &
        CLIENT_PID=$!
        do_facet mds${MDTIDX} lctl set_param fail_loc=0
@@ -837,8 +837,8 @@ test_23d () {
        do_node $CLIENT1 $LFS mkdir -i $MDTIDX $MOUNT1/$remote_dir ||
                        error "lfs mkdir failed"
 
-       # OBD_FAIL_MDS_DROP_OBJ_UPDATE       0x188
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0x188
+       # OBD_FAIL_UPDATE_OBJ_NET    0x1500
+       do_facet mds${MDTIDX} lctl set_param fail_loc=0x1500
        do_node $CLIENT1 rmdir $MOUNT1/$remote_dir &
        CLIENT_PID=$!
        do_facet mds${MDTIDX} lctl set_param fail_loc=0
index 7750d01..24653fc 100755 (executable)
@@ -2006,8 +2006,8 @@ test_80a() {
        local remote_dir=$DIR/$tdir/remote_dir
 
        mkdir -p $DIR/$tdir
-       # OBD_FAIL_MDS_DROP_OBJ_UPDATE         0x188
-       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x188
+       # OBD_FAIL_UPDATE_OBJ_NET    0x1500
+       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1500
        $LFS mkdir -i $MDTIDX $remote_dir &
        local CLIENT_PID=$!
        do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0
@@ -2034,8 +2034,8 @@ test_80b() {
        local remote_dir=$DIR/$tdir/remote_dir
 
        mkdir -p $DIR/$tdir
-       # OBD_FAIL_MDS_DROP_OBJ_UPDATE         0x188
-       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x188
+       # OBD_FAIL_UPDATE_OBJ_NET    0x1500
+       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1500
        $LFS mkdir -i $MDTIDX $remote_dir &
        local CLIENT_PID=$!
        do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0
@@ -2062,8 +2062,8 @@ test_80c() {
        local remote_dir=$DIR/$tdir/remote_dir
 
        mkdir -p $DIR/$tdir
-       # OBD_FAIL_MDS_DROP_OBJ_UPDATE         0x188
-       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x188
+       # OBD_FAIL_UPDATE_OBJ_NET    0x1500
+       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1500
        $LFS mkdir -i $MDTIDX $remote_dir &
        local CLIENT_PID=$!
        do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0
@@ -2086,8 +2086,8 @@ test_80d() {
        local remote_dir=$DIR/$tdir/remote_dir
 
        mkdir -p $DIR/$tdir
-       # OBD_FAIL_MDS_DROP_OBJ_UPDATE         0x188
-       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x188
+       # OBD_FAIL_UPDATE_OBJ_NET    0x1500
+       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x1500
        $LFS mkdir -i $MDTIDX $remote_dir &
        local CLIENT_PID=$!
        do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0
@@ -2223,8 +2223,8 @@ test_81a() {
        mkdir -p $DIR/$tdir
        $LFS mkdir -i $MDTIDX $remote_dir || error "lfs mkdir failed"
 
-       # OBD_FAIL_MDS_DROP_OBJ_UPDATE         0x188
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0x188
+       # OBD_FAIL_UPDATE_OBJ_NET    0x1500
+       do_facet mds${MDTIDX} lctl set_param fail_loc=0x1500
        rmdir $remote_dir &
        local CLIENT_PID=$!
        do_facet mds${MDTIDX} lctl set_param fail_loc=0
@@ -2253,8 +2253,8 @@ test_81b() {
        mkdir -p $DIR/$tdir
        $LFS mkdir -i $MDTIDX $remote_dir || error "lfs mkdir failed"
 
-       # OBD_FAIL_MDS_DROP_OBJ_UPDATE         0x188
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0x188
+       # OBD_FAIL_UPDATE_OBJ_NET    0x1500
+       do_facet mds${MDTIDX} lctl set_param fail_loc=0x1500
        rmdir $remote_dir &
        local CLIENT_PID=$!
        do_facet mds${MDTIDX} lctl set_param fail_loc=0
@@ -2284,8 +2284,8 @@ test_81c() {
        mkdir -p $DIR/$tdir
        $LFS mkdir -i $MDTIDX $remote_dir || error "lfs mkdir failed"
 
-       # OBD_FAIL_MDS_DROP_OBJ_UPDATE         0x188
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0x188
+       # OBD_FAIL_UPDATE_OBJ_NET    0x1500
+       do_facet mds${MDTIDX} lctl set_param fail_loc=0x1500
        rmdir $remote_dir &
        local CLIENT_PID=$!
        do_facet mds${MDTIDX} lctl set_param fail_loc=0
@@ -2311,8 +2311,8 @@ test_81d() {
        mkdir -p $DIR/$tdir
        $LFS mkdir -i $MDTIDX $remote_dir || error "lfs mkdir failed"
 
-       # OBD_FAIL_MDS_DROP_OBJ_UPDATE         0x188
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0x188
+       # OBD_FAIL_UPDATE_OBJ_NET    0x1500
+       do_facet mds${MDTIDX} lctl set_param fail_loc=0x1500
        rmdir $remote_dir &
        local CLIENT_PID=$!
        do_facet mds${MDTIDX} lctl set_param fail_loc=0
@@ -2343,10 +2343,10 @@ test_81e() {
        $LFS mkdir -i $MDTIDX $remote_dir || error "lfs mkdir failed"
 
        # OBD_FAIL_MDS_REINT_NET_REP       0x119
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0x119
+       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x119
        rmdir $remote_dir &
        local CLIENT_PID=$!
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0
+       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0
 
        fail mds${MDTIDX}
 
@@ -2374,10 +2374,10 @@ test_81f() {
        $LFS mkdir -i $MDTIDX $remote_dir || error "lfs mkdir failed"
 
        # OBD_FAIL_MDS_REINT_NET_REP       0x119
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0x119
+       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x119
        rmdir $remote_dir &
        local CLIENT_PID=$!
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0
+       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0
 
        fail mds$((MDTIDX + 1))
 
@@ -2405,10 +2405,10 @@ test_81g() {
        $LFS mkdir -i $MDTIDX $remote_dir || error "lfs mkdir failed"
 
        # OBD_FAIL_MDS_REINT_NET_REP       0x119
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0x119
+       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x119
        rmdir $remote_dir &
        local CLIENT_PID=$!
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0
+       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0
 
        fail mds${MDTIDX}
        fail mds$((MDTIDX + 1))
@@ -2432,10 +2432,10 @@ test_81h() {
        $LFS mkdir -i $MDTIDX $remote_dir || error "lfs mkdir failed"
 
        # OBD_FAIL_MDS_REINT_NET_REP       0x119
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0x119
+       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0x119
        rmdir $remote_dir &
        local CLIENT_PID=$!
-       do_facet mds${MDTIDX} lctl set_param fail_loc=0
+       do_facet mds$((MDTIDX + 1)) lctl set_param fail_loc=0
 
        fail mds${MDTIDX},mds$((MDTIDX + 1))
 
index 989031b..e87097b 100644 (file)
@@ -3795,11 +3795,11 @@ drop_reint_reply() {
 }
 
 drop_update_reply() {
-# OBD_FAIL_MDS_OBJ_UPDATE_NET
+# OBD_FAIL_UPDATE_OBJ_NET
        local index=$1
        shift 1
        RC=0
-       do_facet mds${index} lctl set_param fail_loc=0x188
+       do_facet mds${index} lctl set_param fail_loc=0x1500
        do_facet client "$@" || RC=$?
        do_facet mds${index} lctl set_param fail_loc=0
        return $RC