Whamcloud - gitweb
LU-5040 osd: fix osd declare credit for quota
[fs/lustre-release.git] / lustre / osd-ldiskfs / osd_handler.c
index e4412fb..b542546 100644 (file)
@@ -73,6 +73,7 @@
 #include <lustre_quota.h>
 
 #include <ldiskfs/xattr.h>
+#include <lustre_linkea.h>
 
 int ldiskfs_pdo = 1;
 CFS_MODULE_PARM(ldiskfs_pdo, "i", int, 0644,
@@ -558,7 +559,7 @@ static int osd_fid_lookup(const struct lu_env *env, struct osd_object *obj,
 
        LINVRNT(osd_invariant(obj));
        LASSERT(obj->oo_inode == NULL);
-       LASSERTF(fid_is_sane(fid) || fid_is_idif(fid), DFID, PFID(fid));
+       LASSERTF(fid_is_sane(fid) || fid_is_idif(fid), DFID"\n", PFID(fid));
 
        dev = osd_dev(ldev);
        scrub = &dev->od_scrub;
@@ -890,7 +891,7 @@ static int osd_param_is_not_sane(const struct osd_device *dev,
 {
        struct osd_thandle *oh = container_of(th, typeof(*oh), ot_super);
 
-       return oh->ot_credits > osd_journal(dev)->j_max_transaction_buffers;
+       return oh->ot_credits > osd_transaction_size(dev);
 }
 
 /*
@@ -1040,7 +1041,7 @@ int osd_trans_start(const struct lu_env *env, struct dt_device *d,
                 *
                 *     This should be removed when we can calculate the
                 *     credits precisely. */
-               oh->ot_credits = osd_journal(dev)->j_max_transaction_buffers;
+               oh->ot_credits = osd_transaction_size(dev);
        }
 
         /*
@@ -1741,7 +1742,7 @@ static int osd_declare_attr_set(const struct lu_env *env,
        qid_t                   gid;
        long long               bspace;
        int                     rc = 0;
-       bool                    allocated;
+       bool                    enforce;
        ENTRY;
 
        LASSERT(dt != NULL);
@@ -1769,18 +1770,19 @@ static int osd_declare_attr_set(const struct lu_env *env,
         * We still need to call the osd_declare_qid() to calculate the journal
         * credits for updating quota accounting files and to trigger quota
         * space adjustment once the operation is completed.*/
-       if ((attr->la_valid & LA_UID) != 0 &&
-            attr->la_uid != (uid = i_uid_read(obj->oo_inode))) {
+       if (attr->la_valid & LA_UID || attr->la_valid & LA_GID) {
+               /* USERQUOTA */
+               uid = i_uid_read(obj->oo_inode);
                qi->lqi_type = USRQUOTA;
-
+               enforce = (attr->la_valid & LA_UID) && (attr->la_uid != uid);
                /* inode accounting */
                qi->lqi_is_blk = false;
 
-               /* one more inode for the new owner ... */
+               /* one more inode for the new uid ... */
                qi->lqi_id.qid_uid = attr->la_uid;
                qi->lqi_space      = 1;
-               allocated = (attr->la_uid == 0) ? true : false;
-               rc = osd_declare_qid(env, oh, qi, allocated, NULL);
+               /* Reserve credits for the new uid */
+               rc = osd_declare_qid(env, oh, qi, NULL, enforce, NULL);
                if (rc == -EDQUOT || rc == -EINPROGRESS)
                        rc = 0;
                if (rc)
@@ -1789,7 +1791,7 @@ static int osd_declare_attr_set(const struct lu_env *env,
                /* and one less inode for the current uid */
                qi->lqi_id.qid_uid = uid;
                qi->lqi_space      = -1;
-               rc = osd_declare_qid(env, oh, qi, true, NULL);
+               rc = osd_declare_qid(env, oh, qi, obj, enforce, NULL);
                if (rc == -EDQUOT || rc == -EINPROGRESS)
                        rc = 0;
                if (rc)
@@ -1798,38 +1800,40 @@ static int osd_declare_attr_set(const struct lu_env *env,
                /* block accounting */
                qi->lqi_is_blk = true;
 
-               /* more blocks for the new owner ... */
+               /* more blocks for the new uid ... */
                qi->lqi_id.qid_uid = attr->la_uid;
                qi->lqi_space      = bspace;
-               allocated = (attr->la_uid == 0) ? true : false;
-               rc = osd_declare_qid(env, oh, qi, allocated, NULL);
+               /*
+                * Credits for the new uid has been reserved, re-use "obj"
+                * to save credit reservation.
+                */
+               rc = osd_declare_qid(env, oh, qi, obj, enforce, NULL);
                if (rc == -EDQUOT || rc == -EINPROGRESS)
                        rc = 0;
                if (rc)
                        RETURN(rc);
 
-               /* and finally less blocks for the current owner */
+               /* and finally less blocks for the current uid */
                qi->lqi_id.qid_uid = uid;
                qi->lqi_space      = -bspace;
-               rc = osd_declare_qid(env, oh, qi, true, NULL);
+               rc = osd_declare_qid(env, oh, qi, obj, enforce, NULL);
                if (rc == -EDQUOT || rc == -EINPROGRESS)
                        rc = 0;
                if (rc)
                        RETURN(rc);
-       }
 
-       if (attr->la_valid & LA_GID &&
-           attr->la_gid != (gid = i_gid_read(obj->oo_inode))) {
+               /* GROUP QUOTA */
+               gid = i_gid_read(obj->oo_inode);
                qi->lqi_type = GRPQUOTA;
+               enforce = (attr->la_valid & LA_GID) && (attr->la_gid != gid);
 
                /* inode accounting */
                qi->lqi_is_blk = false;
 
-               /* one more inode for the new group owner ... */
+               /* one more inode for the new gid ... */
                qi->lqi_id.qid_gid = attr->la_gid;
                qi->lqi_space      = 1;
-               allocated = (attr->la_gid == 0) ? true : false;
-               rc = osd_declare_qid(env, oh, qi, allocated, NULL);
+               rc = osd_declare_qid(env, oh, qi, NULL, enforce, NULL);
                if (rc == -EDQUOT || rc == -EINPROGRESS)
                        rc = 0;
                if (rc)
@@ -1838,7 +1842,7 @@ static int osd_declare_attr_set(const struct lu_env *env,
                /* and one less inode for the current gid */
                qi->lqi_id.qid_gid = gid;
                qi->lqi_space      = -1;
-               rc = osd_declare_qid(env, oh, qi, true, NULL);
+               rc = osd_declare_qid(env, oh, qi, obj, enforce, NULL);
                if (rc == -EDQUOT || rc == -EINPROGRESS)
                        rc = 0;
                if (rc)
@@ -1847,20 +1851,19 @@ static int osd_declare_attr_set(const struct lu_env *env,
                /* block accounting */
                qi->lqi_is_blk = true;
 
-               /* more blocks for the new owner ... */
+               /* more blocks for the new gid ... */
                qi->lqi_id.qid_gid = attr->la_gid;
                qi->lqi_space      = bspace;
-               allocated = (attr->la_gid == 0) ? true : false;
-               rc = osd_declare_qid(env, oh, qi, allocated, NULL);
+               rc = osd_declare_qid(env, oh, qi, obj, enforce, NULL);
                if (rc == -EDQUOT || rc == -EINPROGRESS)
                        rc = 0;
                if (rc)
                        RETURN(rc);
 
-               /* and finally less blocks for the current owner */
+               /* and finally less blocks for the current gid */
                qi->lqi_id.qid_gid = gid;
                qi->lqi_space      = -bspace;
-               rc = osd_declare_qid(env, oh, qi, true, NULL);
+               rc = osd_declare_qid(env, oh, qi, obj, enforce, NULL);
                if (rc == -EDQUOT || rc == -EINPROGRESS)
                        rc = 0;
                if (rc)
@@ -1926,6 +1929,7 @@ static int osd_quota_transfer(struct inode *inode, const struct lu_attr *attr)
                struct iattr    iattr;
                int             rc;
 
+               ll_vfs_dq_init(inode);
                iattr.ia_valid = 0;
                if (attr->la_valid & LA_UID)
                        iattr.ia_valid |= ATTR_UID;
@@ -1937,8 +1941,8 @@ static int osd_quota_transfer(struct inode *inode, const struct lu_attr *attr)
                rc = ll_vfs_dq_transfer(inode, &iattr);
                if (rc) {
                        CERROR("%s: quota transfer failed: rc = %d. Is quota "
-                              "enforcement enabled on the ldiskfs filesystem?",
-                              inode->i_sb->s_id, rc);
+                              "enforcement enabled on the ldiskfs "
+                              "filesystem?\n", inode->i_sb->s_id, rc);
                        return rc;
                }
        }
@@ -1990,7 +1994,6 @@ static int osd_attr_set(const struct lu_env *env,
        }
 
         inode = obj->oo_inode;
-       ll_vfs_dq_init(inode);
 
        rc = osd_quota_transfer(inode, attr);
        if (rc)
@@ -2250,7 +2253,7 @@ static void osd_attr_init(struct osd_thread_info *info, struct osd_object *obj,
                  * should not happen since quota enforcement is no longer
                  * enabled on ldiskfs (lquota takes care of it).
                  */
-                LASSERTF(result == 0, "%d", result);
+               LASSERTF(result == 0, "%d\n", result);
                ll_dirty_inode(inode, I_DIRTY_DATASYNC);
         }
 
@@ -2380,7 +2383,7 @@ static int osd_declare_object_create(const struct lu_env *env,
                RETURN(0);
 
        rc = osd_declare_inode_qid(env, attr->la_uid, attr->la_gid, 1, oh,
-                                  false, false, NULL, false);
+                                  osd_dt_obj(dt), false, NULL, false);
        if (rc != 0)
                RETURN(rc);
 
@@ -2451,12 +2454,12 @@ static int osd_declare_object_destroy(const struct lu_env *env,
                             osd_dto_credits_noquota[DTO_INDEX_DELETE] + 3);
        /* one less inode */
        rc = osd_declare_inode_qid(env, i_uid_read(inode), i_gid_read(inode),
-                                  -1, oh, false, true, NULL, false);
+                                  -1, oh, obj, false, NULL, false);
        if (rc)
                RETURN(rc);
        /* data to be truncated */
        rc = osd_declare_inode_qid(env, i_uid_read(inode), i_gid_read(inode),
-                                  0, oh, true, true, NULL, false);
+                                  0, oh, obj, true, NULL, false);
        RETURN(rc);
 }
 
@@ -2809,17 +2812,15 @@ static int osd_object_ref_add(const struct lu_env *env,
         * This also has to properly handle the case of inodes with nlink == 0
         * in case they are being linked into the PENDING directory
         */
-#ifdef I_LINKABLE
-       /* This is necessary to increment from i_nlink == 0 */
-       spin_lock(&inode->i_lock);
-       inode->i_state |= I_LINKABLE;
-       spin_unlock(&inode->i_lock);
-#endif
-
        spin_lock(&obj->oo_guard);
-       ldiskfs_inc_count(oh->ot_handle, inode);
-       if (!S_ISDIR(inode->i_mode))
-               LASSERT(inode->i_nlink <= LDISKFS_LINK_MAX);
+       if (unlikely(inode->i_nlink == 0))
+               /* inc_nlink from 0 may cause WARN_ON */
+               set_nlink(inode, 1);
+       else {
+               ldiskfs_inc_count(oh->ot_handle, inode);
+               if (!S_ISDIR(inode->i_mode))
+                       LASSERT(inode->i_nlink <= LDISKFS_LINK_MAX);
+       }
        spin_unlock(&obj->oo_guard);
 
        ll_dirty_inode(inode, I_DIRTY_DATASYNC);
@@ -2944,6 +2945,7 @@ static int osd_declare_xattr_set(const struct lu_env *env,
 {
        struct osd_thandle *oh;
        int credits;
+       struct super_block *sb = osd_sb(osd_dev(dt->do_lu.lo_dev));
 
        LASSERT(handle != NULL);
 
@@ -2959,13 +2961,16 @@ static int osd_declare_xattr_set(const struct lu_env *env,
        } else if (strcmp(name, XATTR_NAME_VERSION) == 0) {
                credits = 1;
        } else {
-               struct osd_device  *osd = osd_dev(dt->do_lu.lo_dev);
-               struct super_block *sb = osd_sb(osd);
                credits = osd_dto_credits_noquota[DTO_XATTR_SET];
                if (buf && buf->lb_len > sb->s_blocksize) {
                        credits *= (buf->lb_len + sb->s_blocksize - 1) >>
                                        sb->s_blocksize_bits;
                }
+               /*
+                * xattr set may involve inode quota change, reserve credits for
+                * dquot_initialize()
+                */
+               oh->ot_credits += LDISKFS_MAXQUOTAS_INIT_BLOCKS(sb);
        }
 
        osd_trans_declare_op(env, oh, OSD_OT_XATTR_SET, credits);
@@ -3075,6 +3080,7 @@ static int osd_declare_xattr_del(const struct lu_env *env,
                                  struct thandle *handle)
 {
         struct osd_thandle *oh;
+       struct super_block *sb = osd_sb(osd_dev(dt->do_lu.lo_dev));
 
        LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
         LASSERT(handle != NULL);
@@ -3084,6 +3090,11 @@ static int osd_declare_xattr_del(const struct lu_env *env,
 
        osd_trans_declare_op(env, oh, OSD_OT_XATTR_SET,
                             osd_dto_credits_noquota[DTO_XATTR_SET]);
+       /*
+        * xattr del may involve inode quota change, reserve credits for
+        * dquot_initialize()
+        */
+       oh->ot_credits += LDISKFS_MAXQUOTAS_INIT_BLOCKS(sb);
 
        return 0;
 }
@@ -3553,7 +3564,7 @@ static int osd_index_declare_ea_delete(const struct lu_env *env,
        LASSERT(inode);
 
        rc = osd_declare_inode_qid(env, i_uid_read(inode), i_gid_read(inode),
-                                  0, oh, true, true, NULL, false);
+                                  0, oh, osd_dt_obj(dt), true, NULL, false);
        RETURN(rc);
 }
 
@@ -4133,6 +4144,83 @@ int osd_add_oi_cache(struct osd_thread_info *info, struct osd_device *osd,
 }
 
 /**
+ * Get parent FID from the linkEA.
+ *
+ * For a directory which parent resides on remote MDT, to satisfy the
+ * local e2fsck, we insert it into the /REMOTE_PARENT_DIR locally. On
+ * the other hand, to make the lookup(..) on the directory can return
+ * the real parent FID, we append the real parent FID after its ".."
+ * name entry in the /REMOTE_PARENT_DIR.
+ *
+ * Unfortunately, such PFID-in-dirent cannot be preserved via file-level
+ * backup. So after the restore, we cannot get the right parent FID from
+ * its ".." name entry in the /REMOTE_PARENT_DIR. Under such case, since
+ * we have stored the real parent FID in the directory object's linkEA,
+ * we can parse the linkEA for the real parent FID.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] obj      pointer to the object to be handled
+ * \param[out]fid      pointer to the buffer to hold the parent FID
+ *
+ * \retval             0 for getting the real parent FID successfully
+ * \retval             negative error number on failure
+ */
+static int osd_get_pfid_from_linkea(const struct lu_env *env,
+                                   struct osd_object *obj,
+                                   struct lu_fid *fid)
+{
+       struct osd_thread_info  *oti    = osd_oti_get(env);
+       struct lu_buf           *buf    = &oti->oti_big_buf;
+       struct dentry           *dentry = &oti->oti_obj_dentry;
+       struct inode            *inode  = obj->oo_inode;
+       struct linkea_data       ldata  = { 0 };
+       int                      rc;
+       ENTRY;
+
+       fid_zero(fid);
+       if (!S_ISDIR(inode->i_mode))
+               RETURN(-EIO);
+
+again:
+       rc = __osd_xattr_get(inode, dentry, XATTR_NAME_LINK,
+                            buf->lb_buf, buf->lb_len);
+       if (rc == -ERANGE) {
+               rc = __osd_xattr_get(inode, dentry, XATTR_NAME_LINK,
+                                    NULL, 0);
+               if (rc > 0) {
+                       lu_buf_realloc(buf, rc);
+                       if (buf->lb_buf == NULL)
+                               RETURN(-ENOMEM);
+
+                       goto again;
+               }
+       }
+
+       if (unlikely(rc == 0))
+               RETURN(-ENODATA);
+
+       if (rc < 0)
+               RETURN(rc);
+
+       if (unlikely(buf->lb_buf == NULL)) {
+               lu_buf_realloc(buf, rc);
+               if (buf->lb_buf == NULL)
+                       RETURN(-ENOMEM);
+
+               goto again;
+       }
+
+       ldata.ld_buf = buf;
+       rc = linkea_init(&ldata);
+       if (rc == 0) {
+               linkea_first_entry(&ldata);
+               linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, NULL, fid);
+       }
+
+       RETURN(rc);
+}
+
+/**
  * Calls ->lookup() to find dentry. From dentry get inode and
  * read inode's ea to get fid. This is required for  interoperability
  * mode (b11826)
@@ -4186,10 +4274,18 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
 
                /* done with de, release bh */
                brelse(bh);
-               if (rc != 0)
-                       rc = osd_ea_fid_get(env, obj, ino, fid, id);
-               else
+               if (rc != 0) {
+                       if (unlikely(ino == osd_remote_parent_ino(dev)))
+                               /* If the parent is on remote MDT, and there
+                                * is no FID-in-dirent, then we have to get
+                                * the parent FID from the linkEA.  */
+                               rc = osd_get_pfid_from_linkea(env, obj, fid);
+                       else
+                               rc = osd_ea_fid_get(env, obj, ino, fid, id);
+               } else {
                        osd_id_gen(id, ino, OSD_OII_NOGEN);
+               }
+
                if (rc != 0) {
                        fid_zero(&oic->oic_fid);
                        GOTO(out, rc);
@@ -4320,8 +4416,8 @@ static int osd_index_declare_ea_insert(const struct lu_env *env,
                 * calculate how many blocks will be consumed by this index
                 * insert */
                rc = osd_declare_inode_qid(env, i_uid_read(inode),
-                                          i_gid_read(inode), 0,
-                                          oh, true, true, NULL, false);
+                                          i_gid_read(inode), 0, oh,
+                                          osd_dt_obj(dt), true, NULL, false);
        }
 
        if (fid == NULL)
@@ -4380,7 +4476,7 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
        if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
                RETURN(-EACCES);
 
-       LASSERTF(fid_is_sane(fid), "fid"DFID" is insane!", PFID(fid));
+       LASSERTF(fid_is_sane(fid), "fid"DFID" is insane!\n", PFID(fid));
 
        rc = osd_remote_fid(env, osd, fid);
        if (rc < 0) {
@@ -5227,10 +5323,6 @@ again:
                GOTO(out_journal, rc);
        }
 
-       /* skip the REMOTE_PARENT_DIR. */
-       if (inode == dev->od_mdt_map->omm_remote_parent->d_inode)
-               GOTO(out_inode, rc = 0);
-
        rc = osd_get_lma(info, inode, &info->oti_obj_dentry, lma);
        if (rc == 0) {
                LASSERT(!(lma->lma_compat & LMAC_NOT_IN_OI));
@@ -5410,11 +5502,15 @@ static inline int osd_it_ea_rec(const struct lu_env *env,
        int                     rc    = 0;
        ENTRY;
 
+       LASSERT(obj->oo_inode != dev->od_mdt_map->omm_remote_parent->d_inode);
+
        if (attr & LUDA_VERIFY) {
-               attr |= LUDA_TYPE;
-               if (unlikely(ino == osd_sb(dev)->s_root->d_inode->i_ino)) {
+               if (unlikely(ino == osd_remote_parent_ino(dev))) {
                        attr |= LUDA_IGNORE;
-                       rc = 0;
+                       /* If the parent is on remote MDT, and there
+                        * is no FID-in-dirent, then we have to get
+                        * the parent FID from the linkEA.  */
+                       osd_get_pfid_from_linkea(env, obj, fid);
                } else {
                        rc = osd_dirent_check_repair(env, obj, it, fid, id,
                                                     &attr);
@@ -5428,7 +5524,13 @@ static inline int osd_it_ea_rec(const struct lu_env *env,
                                   it->oie_dirent->oied_name[1] != '.'))
                                RETURN(-ENOENT);
 
-                       rc = osd_ea_fid_get(env, obj, ino, fid, id);
+                       if (unlikely(ino == osd_remote_parent_ino(dev)))
+                               /* If the parent is on remote MDT, and there
+                                * is no FID-in-dirent, then we have to get
+                                * the parent FID from the linkEA.  */
+                               rc = osd_get_pfid_from_linkea(env, obj, fid);
+                       else
+                               rc = osd_ea_fid_get(env, obj, ino, fid, id);
                } else {
                        osd_id_gen(id, ino, OSD_OII_NOGEN);
                }
@@ -5616,6 +5718,7 @@ static void osd_key_fini(const struct lu_context *ctx,
        OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
        lu_buf_free(&info->oti_iobuf.dr_pg_buf);
        lu_buf_free(&info->oti_iobuf.dr_bl_buf);
+       lu_buf_free(&info->oti_big_buf);
        OBD_FREE_PTR(info);
 }
 
@@ -6010,13 +6113,12 @@ static int osd_process_config(const struct lu_env *env,
                break;
        case LCFG_PARAM:
                LASSERT(&o->od_dt_dev);
-               rc = class_process_proc_seq_param(PARAM_OSD,
-                                                 lprocfs_osd_obd_vars,
-                                                 cfg, &o->od_dt_dev);
+               rc = class_process_proc_param(PARAM_OSD, lprocfs_osd_obd_vars,
+                                             cfg, &o->od_dt_dev);
                if (rc > 0 || rc == -ENOSYS)
-                       rc = class_process_proc_seq_param(PARAM_OST,
-                                                         lprocfs_osd_obd_vars,
-                                                         cfg, &o->od_dt_dev);
+                       rc = class_process_proc_param(PARAM_OST,
+                                                     lprocfs_osd_obd_vars,
+                                                     cfg, &o->od_dt_dev);
                break;
        default:
                rc = -ENOSYS;
@@ -6179,9 +6281,6 @@ static int __init osd_mod_init(void)
 
        rc = class_register_type(&osd_obd_device_ops, NULL, true,
                                 lprocfs_osd_module_vars,
-#ifndef HAVE_ONLY_PROCFS_SEQ
-                                NULL,
-#endif
                                 LUSTRE_OSD_LDISKFS_NAME, &osd_device_type);
        if (rc)
                lu_kmem_fini(ldiskfs_caches);