Whamcloud - gitweb
LU-1396 osc: control the RPC rate between MDS and OST
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
index 38ec54a..b87fe6b 100644 (file)
@@ -1,6 +1,4 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
  * GPL HEADER START
  *
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
@@ -29,8 +27,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011 Whamcloud, Inc.
- *
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
  * Author: Wang Di <wangdi@clusterfs.com>
  */
 
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
 #define DEBUG_SUBSYSTEM S_MDS
 
 #include <linux/module.h>
-#ifdef HAVE_EXT4_LDISKFS
 #include <ldiskfs/ldiskfs_jbd2.h>
-#else
-#include <linux/jbd.h>
-#endif
 #include <obd.h>
 #include <obd_class.h>
 #include <lustre_ver.h>
 #include <obd_support.h>
 #include <lprocfs_status.h>
-#ifdef HAVE_EXT4_LDISKFS
 #include <ldiskfs/ldiskfs.h>
-#else
-#include <linux/ldiskfs_fs.h>
-#endif
 #include <lustre_mds.h>
 #include <lustre/lustre_idl.h>
 #include <lustre_fid.h>
@@ -494,43 +480,6 @@ int mdd_link_sanity_check(const struct lu_env *env,
         RETURN(rc);
 }
 
-/**
- * If subdir count is up to ddp_max_nlink, then enable MNLINK_OBJ flag and
- * assign i_nlink to 1 which means the i_nlink for subdir count is incredible
- * (maybe too large to be represented). It is a trick to break through the
- * "i_nlink" limitation for subdir count.
- */
-void __mdd_ref_add(const struct lu_env *env, struct mdd_object *obj,
-                   struct thandle *handle)
-{
-        struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
-        struct mdd_device *m = mdd_obj2mdd_dev(obj);
-
-        if (!mdd_is_mnlink(obj)) {
-                if (S_ISDIR(mdd_object_type(obj))) {
-                        if (mdd_la_get(env, obj, tmp_la, BYPASS_CAPA))
-                                return;
-
-                        if (tmp_la->la_nlink >= m->mdd_dt_conf.ddp_max_nlink) {
-                                obj->mod_flags |= MNLINK_OBJ;
-                                tmp_la->la_nlink = 1;
-                                tmp_la->la_valid = LA_NLINK;
-                                mdd_attr_set_internal(env, obj, tmp_la, handle,
-                                                      0);
-                                return;
-                        }
-                }
-                mdo_ref_add(env, obj, handle);
-        }
-}
-
-void __mdd_ref_del(const struct lu_env *env, struct mdd_object *obj,
-                   struct thandle *handle, int is_dot)
-{
-        if (!mdd_is_mnlink(obj) || is_dot)
-                mdo_ref_del(env, obj, handle);
-}
-
 static int __mdd_index_delete_only(const struct lu_env *env, struct mdd_object *pobj,
                                    const char *name, struct thandle *handle,
                                    struct lustre_capa *capa)
@@ -584,7 +533,7 @@ static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
         rc = __mdd_index_insert_only(env, pobj, lf, name, handle, capa);
         if (rc == 0 && is_dir) {
                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
-                __mdd_ref_add(env, pobj, handle);
+                mdo_ref_add(env, pobj, handle);
                 mdd_write_unlock(env, pobj);
         }
         RETURN(rc);
@@ -600,12 +549,8 @@ static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
 
         rc = __mdd_index_delete_only(env, pobj, name, handle, capa);
         if (rc == 0 && is_dir) {
-                int is_dot = 0;
-
-                if (name != NULL && name[0] == '.' && name[1] == 0)
-                        is_dot = 1;
                 mdd_write_lock(env, pobj, MOR_TGT_PARENT);
-                __mdd_ref_del(env, pobj, handle, is_dot);
+                mdo_ref_del(env, pobj, handle);
                 mdd_write_unlock(env, pobj);
         }
 
@@ -622,14 +567,29 @@ int mdd_declare_llog_record(const struct lu_env *env, struct mdd_device *mdd,
 
         LASSERT(mdd->mdd_capa);
 
+        /* XXX: Since we use the 'mdd_capa' as fake llog object here, we
+         *      have to set the parameter 'size' as INT_MAX or 0 to inform
+         *      OSD that this record write is for a llog write or catalog
+         *      header update, and osd declare function will reserve less
+         *      credits for optimization purpose.
+         *
+         *      Reserve 6 blocks for a llog write, since the llog file is
+         *      usually small, reserve 2 blocks for catalog header update,
+         *      because we know for sure that catalog header is already
+         *      allocated.
+         *
+         *      This hack should be removed in 2.3.
+         */
+
         /* record itself */
-        rc = dt_declare_record_write(env, mdd->mdd_capa, reclen, 0, handle);
+        rc = dt_declare_record_write(env, mdd->mdd_capa,
+                                     DECLARE_LLOG_WRITE, 0, handle);
         if (rc)
                 return rc;
 
         /* header will be updated as well */
-        rc = dt_declare_record_write(env, mdd->mdd_capa, LLOG_CHUNK_SIZE,
-                                     0, handle);
+        rc = dt_declare_record_write(env, mdd->mdd_capa,
+                                     DECLARE_LLOG_WRITE, 0, handle);
         if (rc)
                 return rc;
 
@@ -640,13 +600,13 @@ int mdd_declare_llog_record(const struct lu_env *env, struct mdd_device *mdd,
 
         /* new record referencing new plain llog */
         rc = dt_declare_record_write(env, mdd->mdd_capa,
-                                     sizeof(struct llog_logid_rec), 0, handle);
+                                     DECLARE_LLOG_WRITE, 0, handle);
         if (rc)
                 return rc;
 
         /* catalog's header will be updated as well */
-        rc = dt_declare_record_write(env, mdd->mdd_capa, LLOG_CHUNK_SIZE,
-                                     0, handle);
+        rc = dt_declare_record_write(env, mdd->mdd_capa,
+                                     DECLARE_LLOG_REWRITE, 0, handle);
 
         return rc;
 }
@@ -834,7 +794,7 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
         if (rc)
                 GOTO(out_unlock, rc);
 
-        __mdd_ref_add(env, mdd_sobj, handle);
+        mdo_ref_add(env, mdd_sobj, handle);
 
         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
@@ -896,6 +856,7 @@ int mdd_finish_unlink(const struct lu_env *env,
 {
         int rc;
         int reset = 1;
+        int is_dir = S_ISDIR(ma->ma_attr.la_mode);
         ENTRY;
 
         LASSERT(mdd_write_locked(env, obj) != 0);
@@ -903,7 +864,7 @@ int mdd_finish_unlink(const struct lu_env *env,
         /* read HSM flags, needed to set changelogs flags */
         ma->ma_need = MA_HSM | MA_INODE;
         rc = mdd_attr_get_internal(env, obj, ma);
-        if (rc == 0 && ma->ma_attr.la_nlink == 0) {
+        if (rc == 0 && (ma->ma_attr.la_nlink == 0 || is_dir)) {
                 obj->mod_flags |= DEAD_OBJ;
                 /* add new orphan and the object
                  * will be deleted during mdd_close() */
@@ -926,6 +887,9 @@ int mdd_finish_unlink(const struct lu_env *env,
                                 reset = 0;
                 }
 
+                /* get the i_nlink */
+                ma->ma_need = MA_INODE;
+                rc = mdd_attr_get_internal(env, obj, ma);
         }
         if (reset)
                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
@@ -1014,8 +978,8 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
         int rc;
         ENTRY;
 
-        LASSERTF(mdd_object_exists(mdd_cobj) > 0, "FID is "DFID"\n",
-                 PFID(mdd_object_fid(mdd_cobj)));
+        if (mdd_object_exists(mdd_cobj) <= 0)
+                RETURN(-ENOENT);
 
         handle = mdd_trans_create(env, mdd);
         if (IS_ERR(handle))
@@ -1044,10 +1008,10 @@ static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
         if (rc)
                 GOTO(cleanup, rc);
 
-        __mdd_ref_del(env, mdd_cobj, handle, 0);
+        mdo_ref_del(env, mdd_cobj, handle);
         if (is_dir)
                 /* unlink dot */
-                __mdd_ref_del(env, mdd_cobj, handle, 1);
+                mdo_ref_del(env, mdd_cobj, handle);
 
         LASSERT(ma->ma_attr.la_valid & LA_CTIME);
         la->la_ctime = la->la_mtime = ma->ma_attr.la_ctime;
@@ -1109,6 +1073,16 @@ out_trans:
 
 stop:
         mdd_trans_stop(env, mdd, rc, handle);
+#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2,3,50,0)
+       if (rc == 0 && ma->ma_valid & MA_COOKIE && ma->ma_valid & MA_LOV &&
+           ma->ma_valid & MA_FLAGS && ma->ma_attr_flags & MDS_UNLINK_DESTROY)
+               /* Since echo client is incapable of destorying ost object,
+                * it will destory the object here. */
+               rc = mdd_lovobj_unlink(env, mdd, mdd_cobj, la, ma, 1);
+#else
+#warning "please remove this after 2.4 (LOD/OSP)."
+#endif
+
 #ifdef HAVE_QUOTA_SUPPORT
         if (quota_opc)
                 /* Trigger dqrel on the owner of child and parent. If failed,
@@ -1148,11 +1122,11 @@ static int mdd_name_insert(const struct lu_env *env,
         const char *name = lname->ln_name;
         struct lu_attr   *la = &mdd_env_info(env)->mti_la_for_fix;
         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
-        struct mdd_device *mdd = mdo2mdd(pobj);
         struct dynlock_handle *dlh;
         struct thandle *handle;
         int is_dir = S_ISDIR(ma->ma_attr.la_mode);
 #ifdef HAVE_QUOTA_SUPPORT
+        struct mdd_device *mdd = mdo2mdd(pobj);
         struct md_ucred *uc = md_ucred(env);
         struct obd_device *obd = mdd->mdd_obd_dev;
         struct obd_export *exp = md_quota(env)->mq_exp;
@@ -1463,11 +1437,11 @@ static int mdd_rename_tgt(const struct lu_env *env,
          * it must be local one.
          */
         if (tobj && mdd_object_exists(mdd_tobj)) {
-                __mdd_ref_del(env, mdd_tobj, handle, 0);
+                mdo_ref_del(env, mdd_tobj, handle);
 
                 /* Remove dot reference. */
                 if (S_ISDIR(ma->ma_attr.la_mode))
-                        __mdd_ref_del(env, mdd_tobj, handle, 1);
+                        mdo_ref_del(env, mdd_tobj, handle);
 
                 la->la_valid = LA_CTIME;
                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
@@ -1561,7 +1535,6 @@ static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
         struct mdd_device *mdd = mdo2mdd(cobj);
         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
         struct mdd_object *son = md2mdd_obj(cobj);
-        struct lu_attr    *attr = &ma->ma_attr;
         struct lov_mds_md *lmm = NULL;
         int                lmm_size = 0;
         struct thandle    *handle;
@@ -1575,8 +1548,8 @@ static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
         if (!md_should_create(spec->sp_cr_flags))
                 RETURN(0);
         lmm_size = ma->ma_lmm_size;
-        rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
-                            spec, attr);
+
+        rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec, ma);
         if (rc)
                 RETURN(rc);
 
@@ -1621,6 +1594,8 @@ stop:
         mdd_trans_stop(env, mdd, rc, handle);
 out_free:
         /* Finish mdd_lov_create() stuff. */
+        /* if no_create == 0 (not replay), we free lmm allocated by
+         * mdd_lov_create() */
         mdd_lov_create_finish(env, mdd, lmm, lmm_size, spec);
         RETURN(rc);
 }
@@ -1647,7 +1622,7 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
         else if (unlikely(rc < 0)) {
                 CERROR("Object "DFID" locates on remote server\n",
                         PFID(mdo2fid(mdd_obj)));
-                LBUG();
+                RETURN(-EINVAL);
         }
 
         /* The common filename length check. */
@@ -1717,7 +1692,7 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
 
         if (S_ISDIR(ma->ma_attr.la_mode)) {
                 /* Add "." and ".." for newly created dir */
-                __mdd_ref_add(env, child, handle);
+                mdo_ref_add(env, child, handle);
                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
                                              dot, handle, BYPASS_CAPA);
                 if (rc == 0)
@@ -1725,7 +1700,7 @@ int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
                                                      dotdot, handle,
                                                      BYPASS_CAPA);
                 if (rc != 0)
-                        __mdd_ref_del(env, child, handle, 1);
+                        mdo_ref_del(env, child, handle);
         }
         if (rc == 0)
                 mdd_links_add(env, child, pfid, lname, handle, 1);
@@ -2008,6 +1983,9 @@ static int mdd_create(const struct lu_env *env,
         }
 #endif
 
+        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DQACQ_NET))
+                GOTO(out_pending, rc = -EINPROGRESS);
+
         /*
          * No RPC inside the transaction, so OST objects should be created at
          * first.
@@ -2015,7 +1993,7 @@ static int mdd_create(const struct lu_env *env,
         if (S_ISREG(attr->la_mode)) {
                 lmm_size = ma->ma_lmm_size;
                 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
-                                    spec, attr);
+                                    spec, ma);
                 if (rc)
                         GOTO(out_pending, rc);
         }
@@ -2106,6 +2084,14 @@ static int mdd_create(const struct lu_env *env,
         }
         if (lmm && lmm_size > 0) {
                 /* Set Lov here, do not get lmm again later */
+                if (lmm_size > ma->ma_lmm_size) {
+                        /* Reply buffer is smaller, need bigger one */
+                        mdd_max_lmm_buffer(env, lmm_size);
+                        if (unlikely(info->mti_max_lmm == NULL))
+                                GOTO(cleanup, rc = -ENOMEM);
+                        ma->ma_lmm = info->mti_max_lmm;
+                        ma->ma_big_lmm_used = 1;
+                }
                 memcpy(ma->ma_lmm, lmm, lmm_size);
                 ma->ma_lmm_size = lmm_size;
                 ma->ma_valid |= MA_LOV;
@@ -2155,9 +2141,9 @@ cleanup:
 
                 if (rc2 == 0) {
                         mdd_write_lock(env, son, MOR_TGT_CHILD);
-                        __mdd_ref_del(env, son, handle, 0);
+                        mdo_ref_del(env, son, handle);
                         if (initialized && S_ISDIR(attr->la_mode))
-                                __mdd_ref_del(env, son, handle, 1);
+                                mdo_ref_del(env, son, handle);
                         mdd_write_unlock(env, son);
                 }
         }
@@ -2194,6 +2180,10 @@ out_pending:
                               quota_opc);
         }
 #endif
+        /* The child object shouldn't be cached anymore */
+        if (rc)
+                cfs_set_bit(LU_OBJECT_HEARD_BANSHEE,
+                            &child->mo_lu.lo_header->loh_flags);
         return rc;
 }
 
@@ -2575,11 +2565,11 @@ static int mdd_rename(const struct lu_env *env,
                         rc = -EINVAL;
                         goto cleanup;
                 }
-                __mdd_ref_del(env, mdd_tobj, handle, 0);
+                mdo_ref_del(env, mdd_tobj, handle);
 
                 /* Remove dot reference. */
                 if (is_dir)
-                        __mdd_ref_del(env, mdd_tobj, handle, 1);
+                        mdo_ref_del(env, mdd_tobj, handle);
 
                 la->la_valid = LA_CTIME;
                 rc = mdd_attr_check_set_internal(env, mdd_tobj, la, handle, 0);
@@ -3004,6 +2994,7 @@ const struct md_dir_operations mdd_dir_ops = {
         .mdo_rename        = mdd_rename,
         .mdo_link          = mdd_link,
         .mdo_unlink        = mdd_unlink,
+        .mdo_lum_lmm_cmp   = mdd_lum_lmm_cmp,
         .mdo_name_insert   = mdd_name_insert,
         .mdo_name_remove   = mdd_name_remove,
         .mdo_rename_tgt    = mdd_rename_tgt,