Whamcloud - gitweb
LU-1866 lfsck: LFSCK for namespace consistency (3)
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
index 773940e..5fb1cf8 100644 (file)
@@ -60,9 +60,6 @@ static struct lu_name lname_dotdot = {
 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
                         const struct lu_name *lname, struct lu_fid* fid,
                         int mask);
-static int mdd_declare_links_add(const struct lu_env *env,
-                                 struct mdd_object *mdd_obj,
-                                 struct thandle *handle);
 static inline int mdd_links_add(const struct lu_env *env,
                                struct mdd_object *mdd_obj,
                                const struct lu_fid *pfid,
@@ -110,8 +107,8 @@ int mdd_lookup(const struct lu_env *env,
         RETURN(rc);
 }
 
-static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
-                          struct lu_fid *fid)
+int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
+                  struct lu_fid *fid)
 {
         return __mdd_lookup_locked(env, &obj->mod_obj, &lname_dotdot, fid, 0);
 }
@@ -165,16 +162,17 @@ static int mdd_is_parent(const struct lu_env *env,
                         GOTO(out, rc = 1);
                 if (parent)
                         mdd_object_put(env, parent);
-                parent = mdd_object_find(env, mdd, pfid);
-
-                /* cross-ref parent */
-                if (parent == NULL) {
-                        if (pf != NULL)
-                                *pf = *pfid;
-                        GOTO(out, rc = -EREMOTE);
-                } else if (IS_ERR(parent))
-                        GOTO(out, rc = PTR_ERR(parent));
-                p1 = parent;
+
+               parent = mdd_object_find(env, mdd, pfid);
+               if (IS_ERR(parent)) {
+                       GOTO(out, rc = PTR_ERR(parent));
+               } else if (mdd_object_remote(parent)) {
+                       /*FIXME: Because of the restriction of rename in Phase I.
+                        * If the parent is remote, we just assumed lf is not the
+                        * parent of P1 for now */
+                       GOTO(out, rc = 0);
+               }
+               p1 = parent;
         }
         EXIT;
 out:
@@ -296,7 +294,7 @@ int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
         int rc = 0;
         ENTRY;
 
-        if (cobj && mdd_object_exists(cobj))
+       if (cobj && mdd_object_exists(cobj))
                 RETURN(-EEXIST);
 
         if (mdd_is_dead_obj(pobj))
@@ -369,7 +367,34 @@ static inline int mdd_is_sticky(const struct lu_env *env,
        if (tmp_la->la_uid == uc->uc_fsuid)
                return 0;
 
-       return !mdd_capable(uc, CFS_CAP_FOWNER);
+       return !md_capable(uc, CFS_CAP_FOWNER);
+}
+
+static int mdd_may_delete_entry(const struct lu_env *env,
+                               struct mdd_object *pobj, int check_perm)
+{
+       ENTRY;
+
+       LASSERT(pobj != NULL);
+       if (!mdd_object_exists(pobj))
+               RETURN(-ENOENT);
+
+       if (mdd_is_dead_obj(pobj))
+               RETURN(-ENOENT);
+
+       if (check_perm) {
+               int rc;
+               rc = mdd_permission_internal_locked(env, pobj, NULL,
+                                           MAY_WRITE | MAY_EXEC,
+                                           MOR_TGT_PARENT);
+               if (rc)
+                       RETURN(rc);
+       }
+
+       if (mdd_is_append(pobj))
+               RETURN(-EPERM);
+
+       RETURN(0);
 }
 
 /*
@@ -383,31 +408,21 @@ int mdd_may_delete(const struct lu_env *env, struct mdd_object *pobj,
         int rc = 0;
         ENTRY;
 
-        LASSERT(cobj);
+       if (pobj) {
+               rc = mdd_may_delete_entry(env, pobj, check_perm);
+               if (rc != 0)
+                       RETURN(rc);
+       }
+
+       if (cobj == NULL)
+               RETURN(0);
+
         if (!mdd_object_exists(cobj))
                 RETURN(-ENOENT);
 
         if (mdd_is_dead_obj(cobj))
                 RETURN(-ESTALE);
 
-        if (pobj) {
-                if (!mdd_object_exists(pobj))
-                        RETURN(-ENOENT);
-
-                if (mdd_is_dead_obj(pobj))
-                        RETURN(-ENOENT);
-
-                if (check_perm) {
-                        rc = mdd_permission_internal_locked(env, pobj, NULL,
-                                                    MAY_WRITE | MAY_EXEC,
-                                                    MOR_TGT_PARENT);
-                        if (rc)
-                                RETURN(rc);
-                }
-
-                if (mdd_is_append(pobj))
-                        RETURN(-EPERM);
-        }
 
        if (mdd_is_sticky(env, pobj, cobj))
                 RETURN(-EPERM);
@@ -770,14 +785,10 @@ int mdd_changelog_ext_store(const struct lu_env *env, struct mdd_device *mdd,
  * \param tname - target name string
  * \param handle - transacion handle
  */
-static int mdd_changelog_ns_store(const struct lu_env  *env,
-                                 struct mdd_device    *mdd,
-                                 enum changelog_rec_type type,
-                                 unsigned flags,
-                                 struct mdd_object    *target,
-                                 struct mdd_object    *parent,
-                                 const struct lu_name *tname,
-                                 struct thandle *handle)
+int mdd_changelog_ns_store(const struct lu_env *env, struct mdd_device *mdd,
+                          enum changelog_rec_type type, unsigned flags,
+                          struct mdd_object *target, struct mdd_object *parent,
+                          const struct lu_name *tname, struct thandle *handle)
 {
        struct llog_changelog_rec *rec;
        struct lu_buf *buf;
@@ -1095,6 +1106,7 @@ static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
                               const struct lu_name *name, struct md_attr *ma,
                               struct thandle *handle)
 {
+       struct lu_attr     *la = &mdd_env_info(env)->mti_la_for_fix;
         int rc;
 
         rc = mdo_declare_index_delete(env, p, name->ln_name, handle);
@@ -1105,31 +1117,37 @@ static int mdd_declare_unlink(const struct lu_env *env, struct mdd_device *mdd,
         if (rc)
                 return rc;
 
-        rc = mdo_declare_ref_del(env, c, handle);
-        if (rc)
-                return rc;
+       LASSERT(ma->ma_attr.la_valid & LA_CTIME);
+       la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
+       la->la_valid = LA_CTIME | LA_MTIME;
+       rc = mdo_declare_attr_set(env, p, la, handle);
+       if (rc)
+               return rc;
 
-        rc = mdo_declare_ref_del(env, c, handle);
-        if (rc)
-                return rc;
+       if (c != NULL) {
+               rc = mdo_declare_ref_del(env, c, handle);
+               if (rc)
+                       return rc;
 
-        rc = mdo_declare_attr_set(env, p, NULL, handle);
-        if (rc)
-                return rc;
+               rc = mdo_declare_ref_del(env, c, handle);
+               if (rc)
+                       return rc;
 
-        rc = mdo_declare_attr_set(env, c, NULL, handle);
-        if (rc)
-                return rc;
+               rc = mdo_declare_attr_set(env, c, NULL, handle);
+               if (rc)
+                       return rc;
 
-        rc = mdd_declare_finish_unlink(env, c, ma, handle);
-        if (rc)
-                return rc;
+               rc = mdd_declare_finish_unlink(env, c, ma, handle);
+               if (rc)
+                       return rc;
 
-       rc = mdd_declare_links_del(env, c, handle);
-       if (rc != 0)
-               return rc;
+               rc = mdd_declare_links_del(env, c, handle);
+               if (rc != 0)
+                       return rc;
 
-       rc = mdd_declare_changelog_store(env, mdd, name, handle);
+               /* FIXME: need changelog for remove entry */
+               rc = mdd_declare_changelog_store(env, mdd, name, handle);
+       }
 
        return rc;
 }
@@ -1142,74 +1160,96 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
        struct lu_attr     *cattr = &mdd_env_info(env)->mti_cattr;
         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
-        struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
+       struct mdd_object *mdd_cobj = NULL;
         struct mdd_device *mdd = mdo2mdd(pobj);
         struct dynlock_handle *dlh;
         struct thandle    *handle;
-       int rc, is_dir;
+       int rc, is_dir = 0;
         ENTRY;
 
-        if (mdd_object_exists(mdd_cobj) <= 0)
-                RETURN(-ENOENT);
+       /* cobj == NULL means only delete name entry */
+       if (likely(cobj != NULL)) {
+               mdd_cobj = md2mdd_obj(cobj);
+               if (mdd_object_exists(mdd_cobj) == 0)
+                       RETURN(-ENOENT);
+               /* currently it is assume, it could only delete
+                * name entry of remote directory */
+               is_dir = 1;
+       }
 
-        handle = mdd_trans_create(env, mdd);
+       handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
                 RETURN(PTR_ERR(handle));
 
-        rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
-                                lname, ma, handle);
-        if (rc)
-                GOTO(stop, rc);
+       rc = mdd_declare_unlink(env, mdd, mdd_pobj, mdd_cobj,
+                               lname, ma, handle);
+       if (rc)
+               GOTO(stop, rc);
 
-        rc = mdd_trans_start(env, mdd, handle);
-        if (rc)
-                GOTO(stop, rc);
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc)
+               GOTO(stop, rc);
 
-        dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
-        if (dlh == NULL)
+       dlh = mdd_pdo_write_lock(env, mdd_pobj, name, MOR_TGT_PARENT);
+       if (dlh == NULL)
                GOTO(stop, rc = -ENOMEM);
-        mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
 
-       /* fetch cattr */
-       rc = mdd_la_get(env, mdd_cobj, cattr, mdd_object_capa(env, mdd_cobj));
-        if (rc)
-                GOTO(cleanup, rc);
+       if (likely(mdd_cobj != NULL)) {
+               mdd_write_lock(env, mdd_cobj, MOR_TGT_CHILD);
+
+               /* fetch cattr */
+               rc = mdd_la_get(env, mdd_cobj, cattr,
+                               mdd_object_capa(env, mdd_cobj));
+               if (rc)
+                       GOTO(cleanup, rc);
+
+               is_dir = S_ISDIR(cattr->la_mode);
 
-       is_dir = S_ISDIR(cattr->la_mode);
+       }
 
        rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, cattr);
-        if (rc)
-                GOTO(cleanup, rc);
+       if (rc)
+               GOTO(cleanup, rc);
 
        rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
                                mdd_object_capa(env, mdd_pobj));
        if (rc)
                GOTO(cleanup, rc);
 
-       rc = mdo_ref_del(env, mdd_cobj, handle);
-       if (rc != 0) {
-               __mdd_index_insert_only(env, mdd_pobj, mdo2fid(mdd_cobj),
-                                       name, handle,
-                                       mdd_object_capa(env, mdd_pobj));
-               GOTO(cleanup, rc);
+       if (likely(mdd_cobj != NULL)) {
+               rc = mdo_ref_del(env, mdd_cobj, handle);
+               if (rc != 0) {
+                       __mdd_index_insert_only(env, mdd_pobj,
+                                               mdo2fid(mdd_cobj),
+                                               name, handle,
+                                               mdd_object_capa(env, mdd_pobj));
+                       GOTO(cleanup, rc);
+               }
+
+               if (is_dir)
+                       /* unlink dot */
+                       mdo_ref_del(env, mdd_cobj, handle);
+
+               /* fetch updated nlink */
+               rc = mdd_la_get(env, mdd_cobj, cattr,
+                               mdd_object_capa(env, mdd_cobj));
+               if (rc)
+                       GOTO(cleanup, rc);
        }
 
-        if (is_dir)
-                /* unlink dot */
-                mdo_ref_del(env, mdd_cobj, handle);
+       LASSERT(ma->ma_attr.la_valid & LA_CTIME);
+       la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
 
-       /* fetch updated nlink */
-       rc = mdd_la_get(env, mdd_cobj, cattr, mdd_object_capa(env, mdd_cobj));
+       la->la_valid = LA_CTIME | LA_MTIME;
+       rc = mdd_attr_check_set_internal(env, mdd_pobj, la, handle, 0);
        if (rc)
                GOTO(cleanup, rc);
 
-        LASSERT(ma->ma_attr.la_valid & LA_CTIME);
-        la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
-
-        la->la_valid = LA_CTIME | LA_MTIME;
-       rc = mdd_attr_check_set_internal(env, mdd_pobj, la, handle, 0);
-        if (rc)
-                GOTO(cleanup, rc);
+       /* Enough for only unlink the entry */
+       if (unlikely(mdd_cobj == NULL)) {
+               mdd_pdo_write_unlock(env, mdd_pobj, dlh);
+               GOTO(stop, rc);
+       }
 
        if (cattr->la_nlink > 0 || mdd_cobj->mod_count > 0) {
                 /* update ctime of an unlinked file only if it is still
@@ -1369,14 +1409,12 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
         if (unlikely(mdd_is_dead_obj(mdd_obj)))
                 RETURN(-ESTALE);
 
-        rc = mdd_object_exists(mdd_obj);
-        if (unlikely(rc == 0))
-                RETURN(-ESTALE);
-        else if (unlikely(rc < 0)) {
-                CERROR("Object "DFID" locates on remote server\n",
-                        PFID(mdo2fid(mdd_obj)));
-                RETURN(-EINVAL);
-        }
+       if (mdd_object_remote(mdd_obj)) {
+               CDEBUG(D_INFO, "%s: Object "DFID" locates on remote server\n",
+                      mdd2obd_dev(m)->obd_name, PFID(mdo2fid(mdd_obj)));
+       } else if (!mdd_object_exists(mdd_obj)) {
+               RETURN(-ESTALE);
+       }
 
         /* The common filename length check. */
         if (unlikely(lname->ln_namelen > m->mdd_dt_conf.ddp_max_name_len))
@@ -1403,12 +1441,14 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
         RETURN(rc);
 }
 
-int mdd_declare_object_initialize(const struct lu_env *env,
-                                 struct mdd_object *child,
-                                 struct lu_attr *attr,
-                                 struct thandle *handle)
+static int mdd_declare_object_initialize(const struct lu_env *env,
+                                        struct mdd_object *parent,
+                                        struct mdd_object *child,
+                                        struct lu_attr *attr,
+                                        struct thandle *handle)
 {
         int rc;
+       ENTRY;
 
        /*
         * inode mode has been set in creation time, and it's based on umask,
@@ -1425,12 +1465,16 @@ int mdd_declare_object_initialize(const struct lu_env *env,
                                              dot, handle);
                 if (rc == 0)
                         rc = mdo_declare_ref_add(env, child, handle);
+
+               rc = mdo_declare_index_insert(env, child, mdo2fid(parent),
+                                             dotdot, handle);
         }
 
-        if (rc == 0)
+       if (rc == 0 && (fid_is_norm(mdo2fid(child)) ||
+                       fid_is_dot_lustre(mdo2fid(child))))
                 mdd_declare_links_add(env, child, handle);
 
-        return rc;
+       RETURN(rc);
 }
 
 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
@@ -1475,10 +1519,12 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
                 if (rc != 0)
                         mdo_ref_del(env, child, handle);
         }
-        if (rc == 0)
-                mdd_links_add(env, child, pfid, lname, handle, 1);
 
-        RETURN(rc);
+       if (rc == 0 && (fid_is_norm(mdo2fid(child)) ||
+                       fid_is_dot_lustre(mdo2fid(child))))
+               mdd_links_add(env, child, pfid, lname, handle, 1);
+
+       RETURN(rc);
 }
 
 /* has not lock on pobj yet */
@@ -1605,7 +1651,7 @@ static int mdd_declare_create(const struct lu_env *env, struct mdd_device *mdd,
                        GOTO(out, rc);
         }
 
-       rc = mdd_declare_object_initialize(env, c, attr, handle);
+       rc = mdd_declare_object_initialize(env, p, c, attr, handle);
        if (rc)
                GOTO(out, rc);
 
@@ -1651,7 +1697,6 @@ out:
         return rc;
 }
 
-
 /*
  * Create object and insert it into namespace.
  */
@@ -2404,30 +2449,12 @@ cleanup_unlocked:
 
 stop:
         mdd_trans_stop(env, mdd, rc, handle);
-        if (mdd_sobj)
-                mdd_object_put(env, mdd_sobj);
 out_pending:
-        return rc;
+       mdd_object_put(env, mdd_sobj);
+       return rc;
 }
 
-/**
- * The data that link search is done on.
- */
-struct mdd_link_data {
-       /**
-        * Buffer to keep link EA body.
-        */
-       struct lu_buf           *ml_buf;
-       /**
-        * The matched header, entry and its lenght in the EA
-        */
-       struct link_ea_header   *ml_leh;
-       struct link_ea_entry    *ml_lee;
-       int                      ml_reclen;
-};
-
-static int mdd_links_new(const struct lu_env *env,
-                        struct mdd_link_data *ldata)
+int mdd_links_new(const struct lu_env *env, struct mdd_link_data *ldata)
 {
        ldata->ml_buf = mdd_buf_alloc(env, CFS_PAGE_SIZE);
        if (ldata->ml_buf->lb_buf == NULL)
@@ -2445,8 +2472,7 @@ static int mdd_links_new(const struct lu_env *env,
  *
  * \retval 0 or error
  */
-int mdd_links_read(const struct lu_env *env,
-                  struct mdd_object *mdd_obj,
+int mdd_links_read(const struct lu_env *env, struct mdd_object *mdd_obj,
                   struct mdd_link_data *ldata)
 {
        struct lustre_capa *capa;
@@ -2512,10 +2538,8 @@ struct lu_buf *mdd_links_get(const struct lu_env *env,
        return rc ? ERR_PTR(rc) : ldata.ml_buf;
 }
 
-static int mdd_links_write(const struct lu_env *env,
-                          struct mdd_object *mdd_obj,
-                          struct mdd_link_data *ldata,
-                          struct thandle *handle)
+int mdd_links_write(const struct lu_env *env, struct mdd_object *mdd_obj,
+                   struct mdd_link_data *ldata, struct thandle *handle)
 {
        const struct lu_buf *buf = mdd_buf_get_const(env, ldata->ml_buf->lb_buf,
                                                     ldata->ml_leh->leh_len);
@@ -2535,6 +2559,8 @@ static int mdd_lee_pack(struct link_ea_entry *lee, const struct lu_name *lname,
         int             reclen;
 
         fid_cpu_to_be(&tmpfid, pfid);
+       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_CRASH))
+               tmpfid.f_ver = ~0;
         memcpy(&lee->lee_parent_fid, &tmpfid, sizeof(tmpfid));
         memcpy(lee->lee_name, lname->ln_name, lname->ln_namelen);
         reclen = sizeof(struct link_ea_entry) + lname->ln_namelen;
@@ -2554,9 +2580,9 @@ void mdd_lee_unpack(const struct link_ea_entry *lee, int *reclen,
         lname->ln_namelen = *reclen - sizeof(struct link_ea_entry);
 }
 
-static int mdd_declare_links_add(const struct lu_env *env,
-                                 struct mdd_object *mdd_obj,
-                                 struct thandle *handle)
+int mdd_declare_links_add(const struct lu_env *env,
+                         struct mdd_object *mdd_obj,
+                         struct thandle *handle)
 {
         int rc;
 
@@ -2568,27 +2594,15 @@ static int mdd_declare_links_add(const struct lu_env *env,
         return rc;
 }
 
-/* For pathologic linkers, we don't want to spend lots of time scanning the
- * link ea.  Limit ourseleves to something reasonable; links not in the EA
- * can be looked up via (slower) parent lookup.
- */
-#define LINKEA_MAX_COUNT 128
-
 /** Add a record to the end of link ea buf */
-static int mdd_links_add_buf(const struct lu_env *env,
-                            struct mdd_link_data *ldata,
-                            const struct lu_name *lname,
-                            const struct lu_fid *pfid)
+int mdd_links_add_buf(const struct lu_env *env, struct mdd_link_data *ldata,
+                     const struct lu_name *lname, const struct lu_fid *pfid)
 {
        LASSERT(ldata->ml_leh != NULL);
 
        if (lname == NULL || pfid == NULL)
                return -EINVAL;
 
-       /* Make sure our buf is big enough for the new one */
-       if (ldata->ml_leh->leh_reccount > LINKEA_MAX_COUNT)
-               return -EOVERFLOW;
-
        ldata->ml_reclen = lname->ln_namelen + sizeof(struct link_ea_entry);
        if (ldata->ml_leh->leh_len + ldata->ml_reclen >
            ldata->ml_buf->lb_len) {
@@ -2608,9 +2622,8 @@ static int mdd_links_add_buf(const struct lu_env *env,
 }
 
 /** Del the current record from the link ea buf */
-static void mdd_links_del_buf(const struct lu_env *env,
-                             struct mdd_link_data *ldata,
-                             const struct lu_name *lname)
+void mdd_links_del_buf(const struct lu_env *env, struct mdd_link_data *ldata,
+                      const struct lu_name *lname)
 {
        LASSERT(ldata->ml_leh != NULL);
 
@@ -2636,11 +2649,9 @@ static void mdd_links_del_buf(const struct lu_env *env,
  * \retval -ENOENT link does not exist
  * \retval -ve on error
  */
-static int mdd_links_find(const struct lu_env  *env,
-                         struct mdd_object    *mdd_obj,
-                         struct mdd_link_data *ldata,
-                         const struct lu_name *lname,
-                         const struct lu_fid  *pfid)
+int mdd_links_find(const struct lu_env *env, struct mdd_object *mdd_obj,
+                  struct mdd_link_data *ldata, const struct lu_name *lname,
+                  const struct lu_fid  *pfid)
 {
        struct lu_name *tmpname = &mdd_env_info(env)->mti_name2;
        struct lu_fid  *tmpfid = &mdd_env_info(env)->mti_fid;
@@ -2699,6 +2710,14 @@ static int __mdd_links_add(const struct lu_env *env,
                        return -EEXIST;
        }
 
+       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LINKEA_MORE)) {
+               struct lu_fid *tfid = &mdd_env_info(env)->mti_fid2;
+
+               *tfid = *pfid;
+               tfid->f_ver = ~0;
+               mdd_links_add_buf(env, ldata, lname, tfid);
+       }
+
        return mdd_links_add_buf(env, ldata, lname, pfid);
 }
 
@@ -2739,6 +2758,9 @@ static int mdd_links_rename(const struct lu_env *env,
        int rc = 0;
        ENTRY;
 
+       if (OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF))
+               return 0;
+
        LASSERT(oldpfid != NULL || newpfid != NULL);
 
        if (mdd_obj->mod_flags & DEAD_OBJ)