Whamcloud - gitweb
- introduce open counter in mdd
authortappro <tappro>
Sat, 12 Aug 2006 11:21:29 +0000 (11:21 +0000)
committertappro <tappro>
Sat, 12 Aug 2006 11:21:29 +0000 (11:21 +0000)
- update mo_close() method with adding the md_attr
- split mdd_attr_get() into two sub-function __mdd_atte_get, __mdd_lov_get
  because they are needed for internal use in mdd
- return lov ea and llog cookie in mdt_close() for unlinked file
- mdt_handle_last_unlink() checks the ma_valid fields to decide what to return
  to the client

lustre/cmm/cmm_object.c
lustre/include/md_object.h
lustre/mdd/mdd_handler.c
lustre/mdd/mdd_internal.h
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_lib.c
lustre/mdt/mdt_open.c

index 9a68ca1..a275d6f 100644 (file)
@@ -309,11 +309,12 @@ static int cml_open(const struct lu_context *ctx, struct md_object *mo)
         RETURN(rc);
 }
 
-static int cml_close(const struct lu_context *ctx, struct md_object *mo)
+static int cml_close(const struct lu_context *ctx, struct md_object *mo,
+                     struct md_attr *ma)
 {
         int rc;
         ENTRY;
-        rc = mo_close(ctx, md_object_next(mo));
+        rc = mo_close(ctx, md_object_next(mo), ma);
         RETURN(rc);
 }
 
@@ -612,7 +613,8 @@ static int cmr_open(const struct lu_context *ctx, struct md_object *mo)
         RETURN(-EREMOTE);
 }
 
-static int cmr_close(const struct lu_context *ctx, struct md_object *mo)
+static int cmr_close(const struct lu_context *ctx, struct md_object *mo,
+                     struct md_attr *ma)
 {
         RETURN(-EFAULT);
 }
index 56e0c1f..18a2053 100644 (file)
@@ -116,7 +116,8 @@ struct md_object_operations {
         int (*moo_ref_del)(const struct lu_context *, struct md_object *,
                            struct md_attr *);
         int (*moo_open)(const struct lu_context *, struct md_object *);
-        int (*moo_close)(const struct lu_context *, struct md_object *);
+        int (*moo_close)(const struct lu_context *, struct md_object *,
+                         struct md_attr *);
 };
 
 /*
@@ -295,10 +296,11 @@ static inline int mo_open(const struct lu_context *cx, struct md_object *m)
         return m->mo_ops->moo_open(cx, m);
 }
 
-static inline int mo_close(const struct lu_context *cx, struct md_object *m)
+static inline int mo_close(const struct lu_context *cx, struct md_object *m,
+                           struct md_attr *ma)
 {
         LASSERT(m->mo_ops->moo_close);
-        return m->mo_ops->moo_close(cx, m);
+        return m->mo_ops->moo_close(cx, m, ma);
 }
 
 static inline int mo_readpage(const struct lu_context *cx, struct md_object *m,
index 828352b..659754c 100644 (file)
@@ -183,6 +183,7 @@ static int mdd_may_create(const struct lu_context *ctxt,
         /*check pobj may create or not*/
         RETURN(0);
 }
+
 /*Check whether it may delete the cobj under the pobj*/
 static int mdd_may_delete(const struct lu_context *ctxt,
                           struct mdd_object *pobj, struct mdd_object *cobj,
@@ -214,34 +215,64 @@ static int mdd_may_delete(const struct lu_context *ctxt,
         RETURN(rc);
 }
 
-static int mdd_attr_get(const struct lu_context *ctxt,
-                        struct md_object *obj, struct md_attr *ma)
+static int __mdd_attr_get(const struct lu_context *ctxt,
+                          struct mdd_object *obj, struct md_attr *ma)
 {
-        struct mdd_object *mdd_obj = md2mdd_obj(obj);
         struct dt_object  *next;
         int                rc;
 
         ENTRY;
 
-        LASSERT(lu_object_exists(ctxt, &obj->mo_lu));
+        LASSERT(lu_object_exists(ctxt, mdd2lu_obj(obj)));
 
-        next = mdd_object_child(mdd_obj);
+        next = mdd_object_child(obj);
         rc = next->do_ops->do_attr_get(ctxt, next, &ma->ma_attr);
         if (rc == 0) {
                 LASSERT((ma->ma_attr.la_mode & S_IFMT) ==
-                        (obj->mo_lu.lo_header->loh_attr & S_IFMT));
+                        (lu_object_attr(mdd2lu_obj(obj)) & S_IFMT));
                 ma->ma_valid = MA_INODE;
-                /* get LOV EA also */
-                if ((S_ISREG(ma->ma_attr.la_mode)
-                     || S_ISDIR(ma->ma_attr.la_mode))
-                     && ma->ma_lmm != 0 && ma->ma_lmm_size > 0) {
-                        rc = mdd_get_md(ctxt, obj, ma->ma_lmm, &ma->ma_lmm_size);
-                        if (rc > 0) {
-                                ma->ma_valid |= MA_LOV;
-                                rc = 0;
-                        }
+        }
+
+        RETURN(rc);
+}
+
+static int __mdd_lov_get(const struct lu_context *ctxt,
+                         struct mdd_object *obj, struct md_attr *ma)
+{
+        struct dt_object  *next;
+        int                rc = 0;
+
+        ENTRY;
+
+        next = mdd_object_child(obj);
+        if ((S_ISREG(ma->ma_attr.la_mode) || S_ISDIR(ma->ma_attr.la_mode))
+            && ma->ma_lmm != 0 && ma->ma_lmm_size > 0) {
+                rc = mdd_get_md(ctxt, &obj->mod_obj,
+                                ma->ma_lmm, &ma->ma_lmm_size);
+                if (rc > 0) {
+                        ma->ma_valid |= MA_LOV;
+                        rc = 0;
                 }
         }
+
+        RETURN(rc);
+}
+
+static int mdd_attr_get(const struct lu_context *ctxt,
+                        struct md_object *obj, struct md_attr *ma)
+{
+        struct mdd_object *mdd_obj = md2mdd_obj(obj);
+        struct dt_object  *next;
+        int                rc;
+
+        ENTRY;
+
+        next = mdd_object_child(mdd_obj);
+        rc = __mdd_attr_get(ctxt, mdd_obj, ma);
+        if (rc == 0) {
+                /* get LOV EA also */
+                rc = __mdd_lov_get(ctxt, mdd_obj, ma);
+        }
         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64"\n",
                         rc, ma->ma_valid);
         RETURN(rc);
@@ -781,6 +812,29 @@ static int mdd_dir_is_empty(const struct lu_context *ctx,
         return result;
 }
 
+static int __mdd_finish_unlink(const struct lu_context *ctxt,
+                               struct mdd_object *obj, struct md_attr *ma)
+{
+        int rc;
+        ENTRY;
+        rc = __mdd_attr_get(ctxt, obj, ma);
+        if (rc)
+                RETURN(rc);
+
+        if (atomic_read(&obj->mod_count) == 0 &&
+            ma->ma_attr.la_nlink == 0) {
+                rc = __mdd_lov_get(ctxt, obj, ma);
+                if (rc) {
+                        CERROR("Can't get LOV attr during unlink()\n");
+                }
+                if(S_ISREG(ma->ma_attr.la_mode) && 
+                   ma->ma_valid & MA_LOV)
+                        rc = mdd_unlink_log(ctxt, mdo2mdd(&obj->mod_obj),
+                                            obj, ma);                
+        }
+        RETURN(rc);
+}
+
 static int mdd_unlink_sanity_check(const struct lu_context *ctxt,
                                    struct mdd_object *pobj,
                                    struct mdd_object *cobj,
@@ -795,7 +849,7 @@ static int mdd_unlink_sanity_check(const struct lu_context *ctxt,
                 RETURN(rc);
 
         if (S_ISDIR(mdd_object_type(cobj)) && 
-                        dt_try_as_dir(ctxt, dt_cobj)) {
+            dt_try_as_dir(ctxt, dt_cobj)) {
                 rc = mdd_dir_is_empty(ctxt, cobj);
                 if (rc != 0)
                         RETURN(rc);
@@ -832,23 +886,15 @@ static int mdd_unlink(const struct lu_context *ctxt, struct md_object *pobj,
                 GOTO(cleanup, rc);
 
         __mdd_ref_del(ctxt, mdd_cobj, handle);
-        if (S_ISDIR(ma->ma_attr.la_mode)) {
+        if (S_ISDIR(lu_object_attr(&cobj->mo_lu))) {
                 /* unlink dot */
                 __mdd_ref_del(ctxt, mdd_cobj, handle);
                 /* unlink dotdot */
                 __mdd_ref_del(ctxt, mdd_pobj, handle);
         }
-        mdd_attr_get(ctxt, cobj, ma);
-
-        mdd_set_dead_obj(mdd_cobj);
-#if 0
-        /*This should be moved to handle last unlink. wait open
-         * orphan prototype finished*/
-        if (S_ISREG(ma->ma_attr.la_mode) && (ma->ma_valid & MA_LOV) &&
-            ma->ma_attr.la_nlink == 0 && cobj->mo_lu.lo_header->loh_ref == 1) {
-                rc = mdd_unlink_log(ctxt, mdd, mdd_cobj, ma);
-        }
-#endif
+        
+        rc = __mdd_finish_unlink(ctxt, mdd_cobj, ma);
+
 cleanup:
         mdd_unlock2(ctxt, mdd_pobj, mdd_cobj);
         mdd_trans_stop(ctxt, mdd, handle);
@@ -996,10 +1042,11 @@ static int mdd_rename(const struct lu_context *ctxt, struct md_object *src_pobj,
         if (tobj)
                 mdd_tobj = md2mdd_obj(tobj);
 
+        /*XXX: shouldn't this check be done under lock below? */
         rc = mdd_rename_sanity_check(ctxt, mdd_spobj, mdd_tpobj,
                                      mdd_sobj, mdd_tobj);
         if (rc)
-                GOTO(cleanup, rc);
+                RETURN(rc);
 
         mdd_txn_param_build(ctxt, &MDD_TXN_RENAME);
         handle = mdd_trans_start(ctxt, mdd);
@@ -1546,6 +1593,7 @@ static int mdd_ref_del(const struct lu_context *ctxt, struct md_object *obj,
         struct mdd_object *mdd_obj = md2mdd_obj(obj);
         struct mdd_device *mdd = mdo2mdd(obj);
         struct thandle *handle;
+        int rc;
         ENTRY;
 
         mdd_txn_param_build(ctxt, &MDD_TXN_XATTR_SET);
@@ -1554,22 +1602,44 @@ static int mdd_ref_del(const struct lu_context *ctxt, struct md_object *obj,
                 RETURN(-ENOMEM);
 
         mdd_lock(ctxt, mdd_obj, DT_WRITE_LOCK);
+        
+        /* rmdir checks */
+        if (S_ISDIR(lu_object_attr(&obj->mo_lu)) && 
+            dt_try_as_dir(ctxt, mdd_object_child(mdd_obj))) {
+                rc = mdd_dir_is_empty(ctxt, mdd_obj);
+                if (rc != 0)
+                        GOTO(cleanup, rc);
+        }
+
         __mdd_ref_del(ctxt, mdd_obj, handle);
-        mdd_attr_get(ctxt, obj, ma);
-        mdd_unlock(ctxt, mdd_obj, DT_WRITE_LOCK);
+        if (S_ISDIR(lu_object_attr(&obj->mo_lu))) {
+                /* unlink dot */
+                __mdd_ref_del(ctxt, mdd_obj, handle);
+        }
+        
+        rc = __mdd_finish_unlink(ctxt, mdd_obj, ma);
 
+cleanup:        
+        mdd_unlock(ctxt, mdd_obj, DT_WRITE_LOCK);
         mdd_trans_stop(ctxt, mdd, handle);
-
-        RETURN(0);
+        RETURN(rc);
 }
 
 static int mdd_open(const struct lu_context *ctxt, struct md_object *obj)
 {
+        atomic_inc(&md2mdd_obj(obj)->mod_count);
         return 0;
 }
 
-static int mdd_close(const struct lu_context *ctxt, struct md_object *obj)
+static int mdd_close(const struct lu_context *ctxt, struct md_object *obj,
+                     struct md_attr *ma)
 {
+        __mdd_attr_get(ctxt, md2mdd_obj(obj), ma);
+        
+        if (atomic_dec_and_test(&md2mdd_obj(obj)->mod_count)) {
+                CWARN("File closed\n");
+        }
+
         return 0;
 }
 
index c355615..1969a1f 100644 (file)
@@ -48,6 +48,8 @@ enum mod_flags {
 
 struct mdd_object {
         struct md_object  mod_obj;
+        /* open count */
+        atomic_t          mod_count;
         unsigned long     mod_flags;
 };
 
index 65447f5..6f9ea7a 100644 (file)
@@ -1307,7 +1307,7 @@ static int mdt_recovery(struct ptlrpc_request *req)
 
                 rc = mds_filter_recovery_request(req, obd, &should_process);
                 if (rc != 0 || !should_process) {
-                        LASSERT(rc < 0);
+                        //LASSERT(rc < 0);
                         RETURN(rc);
                 }
         }
@@ -2608,6 +2608,7 @@ static int mdt_destroy_export(struct obd_export *export)
         struct obd_device *obd = export->exp_obd;
         struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
         struct lu_context ctxt;
+        struct md_attr ma;
         int rc = 0;
         ENTRY;
 
@@ -2628,13 +2629,17 @@ static int mdt_destroy_export(struct obd_export *export)
                 struct list_head *tmp = med->med_open_head.next;
                 struct mdt_file_data *mfd =
                         list_entry(tmp, struct mdt_file_data, mfd_list);
+                struct mdt_object *o = mfd->mfd_object;
 
                 /* Remove mfd handle so it can't be found again.
                  * We are consuming the mfd_list reference here. */
                 class_handle_unhash(&mfd->mfd_handle);
                 list_del_init(&mfd->mfd_list);
                 spin_unlock(&med->med_open_lock);
-                mdt_mfd_close(&ctxt, mdt, mfd);
+                mdt_mfd_close(&ctxt, mdt, mfd, &ma);
+                /* TODO: if we close the unlinked file,
+                 * we need to remove it's objects from OST */
+                mdt_object_put(&ctxt, o);
                 spin_lock(&med->med_open_lock);
         }
         spin_unlock(&med->med_open_lock);
index 1717ab6..4590d12 100644 (file)
@@ -357,11 +357,6 @@ void mdt_fs_cleanup(const struct lu_context *ctxt,
 int mdt_client_free(const struct lu_context *ctxt,
                     struct mdt_device *mdt,
                     struct mdt_export_data *med);
-
-int mdt_update_server_data(const struct lu_context *ctxt,
-                           struct mdt_device *mdt,
-                           int sync);
-
 int mdt_client_add(const struct lu_context *ctxt,
                    struct mdt_device *mdt,
                    struct mdt_export_data *med,
@@ -376,7 +371,7 @@ int mdt_lock_new_child(struct mdt_thread_info *info,
 int mdt_reint_open(struct mdt_thread_info *info);
 
 void mdt_mfd_close(const struct lu_context *ctxt, struct mdt_device *mdt,
-                   struct mdt_file_data *mfd);
+                   struct mdt_file_data *mfd, struct md_attr *ma);
 
 int mdt_close(struct mdt_thread_info *info);
 
index d0797b0..a618fe7 100644 (file)
@@ -105,31 +105,26 @@ int mdt_handle_last_unlink(struct mdt_thread_info *info, struct mdt_object *mo,
         const struct lu_attr *la = &ma->ma_attr;
         ENTRY;
 
-
-        /* if this is the last unlinked object reference,
-         * so client should destroy ost objects*/
-        if (S_ISREG(la->la_mode) &&
-            la->la_nlink == 0 && mo->mot_header.loh_ref == 1) {
-
-                CDEBUG(D_INODE, "Last reference is released on "DFID"\n",
-                                PFID(mdt_object_fid(mo)));
-
-                CDEBUG(D_INODE, "ma_valid = "LPX64"\n", ma->ma_valid);
-                repbody = req_capsule_server_get(&info->mti_pill,
-                                                 &RMF_MDT_BODY);
-                if (ma->ma_valid & MA_INODE)
-                        mdt_pack_attr2body(repbody, la, mdt_object_fid(mo));
-                if (/*ma->ma_lmm_size && */(ma->ma_valid & MA_LOV)) {
-                        LASSERT(ma->ma_lmm_size);
-                        mdt_dump_lmm(D_INFO, ma->ma_lmm);
-                        repbody->eadatasize = ma->ma_lmm_size;
+        repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
+        
+        if (ma->ma_valid & MA_INODE)
+                mdt_pack_attr2body(repbody, la, mdt_object_fid(mo));
+
+        if (ma->ma_valid & MA_LOV) {
+                LASSERT(ma->ma_lmm_size);
+                mdt_dump_lmm(D_INFO, ma->ma_lmm);
+                repbody->eadatasize = ma->ma_lmm_size;
+                if (S_ISREG(lu_object_attr(&mo->mot_obj.mo_lu)))
                         repbody->valid |= OBD_MD_FLEASIZE;
-                }
-
-                if (ma->ma_cookie_size && (ma->ma_valid & MA_COOKIE))
-                        repbody->valid |= OBD_MD_FLCOOKIE;
+                else if (S_ISDIR(lu_object_attr(&mo->mot_obj.mo_lu)))
+                        repbody->valid |= OBD_MD_FLDIREA;
+                else 
+                        LBUG();
         }
-
+        
+        if (ma->ma_cookie_size && (ma->ma_valid & MA_COOKIE))
+                repbody->valid |= OBD_MD_FLCOOKIE;
+        
         RETURN(0);
 }
 
index 74af20a..0fe38f1 100644 (file)
@@ -290,7 +290,9 @@ static int mdt_mfd_open(struct mdt_thread_info *info,
                 /* keep a reference on this object for this open,
                 * and is released by mdt_mfd_close() */
                 mdt_object_get(info->mti_ctxt, o);
-
+                /* open hanling */
+                mo_open(info->mti_ctxt, mdt_object_child(o));
+                
                 mfd->mfd_mode = flags;
                 mfd->mfd_object = o;
                 mfd->mfd_xid = mdt_info_req(info)->rq_xid;
@@ -470,8 +472,9 @@ out:
         return result;
 }
 
-void mdt_mfd_close(const struct lu_context *ctxt, struct mdt_device *mdt,
-                   struct mdt_file_data *mfd)
+void mdt_mfd_close(const struct lu_context *ctxt,
+                   struct mdt_device *mdt, struct mdt_file_data *mfd,
+                   struct md_attr *ma)
 {
         struct mdt_object *o = mfd->mfd_object;
         ENTRY;
@@ -481,22 +484,18 @@ void mdt_mfd_close(const struct lu_context *ctxt, struct mdt_device *mdt,
         } else if (mfd->mfd_mode & MDS_FMODE_EXEC) {
                 mdt_allow_write_access(o);
         }
-
-        /* release reference on this object.
-         * it will be destroyed by lower layer if necessary.
-         */
-        mdt_object_put(ctxt, mfd->mfd_object);
-
+        
         mdt_mfd_free(mfd);
-        EXIT;
+
+        mo_close(ctxt, mdt_object_child(o), ma);
 }
 
 int mdt_close(struct mdt_thread_info *info)
 {
-        struct md_attr         *ma = &info->mti_attr;
         struct mdt_export_data *med;
         struct mdt_file_data   *mfd;
         struct mdt_object      *o;
+        struct md_attr         *ma = &info->mti_attr;
         int rc;
         ENTRY;
 
@@ -514,17 +513,17 @@ int mdt_close(struct mdt_thread_info *info)
                 class_handle_unhash(&mfd->mfd_handle);
                 list_del_init(&mfd->mfd_list);
                 spin_unlock(&med->med_open_lock);
-
-                o = mfd->mfd_object;
+                
                 ma->ma_lmm = req_capsule_server_get(&info->mti_pill,
-                                                        &RMF_MDT_MD);
+                                                    &RMF_MDT_MD);
                 ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
-                                                       &RMF_MDT_MD, RCL_SERVER);
-                rc = mo_attr_get(info->mti_ctxt, mdt_object_child(o), ma);
-                if (rc == 0)
-                        rc = mdt_handle_last_unlink(info, o, ma);
-
-                mdt_mfd_close(info->mti_ctxt, info->mti_mdt, mfd);
+                                                       &RMF_MDT_MD,
+                                                       RCL_SERVER);
+                o = mfd->mfd_object;
+                mdt_mfd_close(info->mti_ctxt, info->mti_mdt, mfd, ma);
+                rc = mdt_handle_last_unlink(info, o, ma);
+                /* release reference on this object. */
+                mdt_object_put(info->mti_ctxt, o);
         }
         mdt_shrink_reply(info);
         RETURN(rc);