Whamcloud - gitweb
LU-1303 lod: introduce lod device
[fs/lustre-release.git] / lustre / mdt / mdt_lib.c
index b513d44..b734013 100644 (file)
@@ -1,6 +1,4 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
  * GPL HEADER START
  *
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
@@ -28,6 +26,8 @@
 /*
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
  * Author: Fan Yong <fanyong@clusterfs.com>
  */
 
-
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
 #define DEBUG_SUBSYSTEM S_MDS
 
 #include "mdt_internal.h"
@@ -528,31 +524,34 @@ int mdt_init_ucred_reint(struct mdt_thread_info *info)
 void mdt_dump_lmm(int level, const struct lov_mds_md *lmm)
 {
         const struct lov_ost_data_v1 *lod;
-        int i;
-        __s16 stripe_count =
-                le16_to_cpu(((struct lov_user_md*)lmm)->lmm_stripe_count);
+        int                           i;
+        __u16                         count;
+
+        count = le16_to_cpu(((struct lov_user_md*)lmm)->lmm_stripe_count);
 
         CDEBUG(level, "objid "LPX64", magic 0x%08X, pattern %#X\n",
                le64_to_cpu(lmm->lmm_object_id), le32_to_cpu(lmm->lmm_magic),
                le32_to_cpu(lmm->lmm_pattern));
         CDEBUG(level,"stripe_size=0x%x, stripe_count=0x%x\n",
-               le32_to_cpu(lmm->lmm_stripe_size),
-               le32_to_cpu(lmm->lmm_stripe_count));
-        LASSERT(stripe_count <= (__s16)LOV_MAX_STRIPE_COUNT);
-        for (i = 0, lod = lmm->lmm_objects; i < stripe_count; i++, lod++) {
+               le32_to_cpu(lmm->lmm_stripe_size), count);
+        if (count == LOV_ALL_STRIPES)
+                return;
+        LASSERT(count <= LOV_MAX_STRIPE_COUNT);
+        for (i = 0, lod = lmm->lmm_objects; i < count; i++, lod++)
                 CDEBUG(level, "stripe %u idx %u subobj "LPX64"/"LPX64"\n",
                        i, le32_to_cpu(lod->l_ost_idx),
                        le64_to_cpu(lod->l_object_seq),
                        le64_to_cpu(lod->l_object_id));
-        }
 }
 
-void mdt_shrink_reply(struct mdt_thread_info *info)
+/* Shrink and/or grow reply buffers */
+int mdt_fix_reply(struct mdt_thread_info *info)
 {
         struct req_capsule *pill = info->mti_pill;
         struct mdt_body    *body;
-        int                md_size;
+        int                md_size, md_packed = 0;
         int                acl_size;
+        int                rc = 0;
         ENTRY;
 
         body = req_capsule_server_get(pill, &RMF_MDT_BODY);
@@ -572,10 +571,10 @@ void mdt_shrink_reply(struct mdt_thread_info *info)
         }
 
         CDEBUG(D_INFO, "Shrink to md_size = %d cookie/acl_size = %d"
-                        " MDSCAPA = "LPX64", OSSCAPA = "LPX64"\n",
+                        " MDSCAPA = %llx, OSSCAPA = %llx\n",
                         md_size, acl_size,
-                        body->valid & OBD_MD_FLMDSCAPA,
-                        body->valid & OBD_MD_FLOSSCAPA);
+                        (unsigned long long)(body->valid & OBD_MD_FLMDSCAPA),
+                        (unsigned long long)(body->valid & OBD_MD_FLOSSCAPA));
 /*
             &RMF_MDT_BODY,
             &RMF_MDT_MD,
@@ -585,9 +584,24 @@ void mdt_shrink_reply(struct mdt_thread_info *info)
 (optional)  something else
 */
 
-        if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
-                req_capsule_shrink(pill, &RMF_MDT_MD, md_size,
-                                   RCL_SERVER);
+        /* MDT_MD buffer may be bigger than packed value, let's shrink all
+         * buffers before growing it */
+       if (info->mti_big_lmm_used) {
+                LASSERT(req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER));
+                md_packed = req_capsule_get_size(pill, &RMF_MDT_MD,
+                                                 RCL_SERVER);
+                LASSERT(md_packed > 0);
+                /* buffer must be allocated separately */
+                LASSERT(info->mti_attr.ma_lmm !=
+                        req_capsule_server_get(pill, &RMF_MDT_MD));
+                req_capsule_shrink(pill, &RMF_MDT_MD, 0, RCL_SERVER);
+                /* free big lmm if md_size is not needed */
+                if (md_size == 0)
+                       info->mti_big_lmm_used = 0;
+        } else if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) {
+                req_capsule_shrink(pill, &RMF_MDT_MD, md_size, RCL_SERVER);
+        }
+
         if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER))
                 req_capsule_shrink(pill, &RMF_ACL, acl_size, RCL_SERVER);
         else if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
@@ -606,7 +620,38 @@ void mdt_shrink_reply(struct mdt_thread_info *info)
          * Some more field should be shrinked if needed.
          * This should be done by those who added fields to reply message.
          */
-        EXIT;
+
+        /* Grow MD buffer if needed finally */
+       if (info->mti_big_lmm_used) {
+                void *lmm;
+
+                LASSERT(md_size > md_packed);
+                CDEBUG(D_INFO, "Enlarge reply buffer, need extra %d bytes\n",
+                       md_size - md_packed);
+                rc = req_capsule_server_grow(pill, &RMF_MDT_MD, md_size);
+                if (rc) {
+                        /* we can't answer with proper LOV EA, drop flags,
+                         * the rc is also returned so this request is
+                         * considered as failed */
+                        body->valid &= ~(OBD_MD_FLDIREA | OBD_MD_FLEASIZE);
+                        /* don't return transno along with error */
+                        lustre_msg_set_transno(pill->rc_req->rq_repmsg, 0);
+                } else {
+                        /* now we need to pack right LOV EA */
+                        lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
+                        LASSERT(req_capsule_get_size(pill, &RMF_MDT_MD,
+                                                     RCL_SERVER) ==
+                                info->mti_attr.ma_lmm_size);
+                        memcpy(lmm, info->mti_attr.ma_lmm,
+                               info->mti_attr.ma_lmm_size);
+                }
+                /* update mdt_max_mdsize so clients will be aware about that */
+                if (info->mti_mdt->mdt_max_mdsize < info->mti_attr.ma_lmm_size)
+                        info->mti_mdt->mdt_max_mdsize =
+                                                    info->mti_attr.ma_lmm_size;
+               info->mti_big_lmm_used = 0;
+        }
+        RETURN(rc);
 }
 
 
@@ -728,7 +773,7 @@ static __u64 mdt_attr_valid_xlate(__u64 in, struct mdt_reint_record *rr,
                 out |= LA_BLOCKS;
 
         if (in & ATTR_FROM_OPEN)
-                rr->rr_flags |= MRF_SETATTR_LOCKED;
+                rr->rr_flags |= MRF_OPEN_TRUNC;
 
         if (in & ATTR_ATIME_SET)
                 out |= LA_ATIME;
@@ -742,10 +787,16 @@ static __u64 mdt_attr_valid_xlate(__u64 in, struct mdt_reint_record *rr,
         if (in & ATTR_ATTR_FLAG)
                 out |= LA_FLAGS;
 
+        if (in & ATTR_KILL_SUID)
+                out |= LA_KILL_SUID;
+
+        if (in & ATTR_KILL_SGID)
+                out |= LA_KILL_SGID;
+
         if (in & MDS_OPEN_OWNEROVERRIDE)
                 ma->ma_attr_flags |= MDS_OPEN_OWNEROVERRIDE;
 
-        if (in & (ATTR_KILL_SUID|ATTR_KILL_SGID))
+        if (in & ATTR_FORCE)
                 ma->ma_attr_flags |= MDS_PERM_BYPASS;
 
         /*XXX need ATTR_RAW?*/
@@ -828,6 +879,7 @@ static inline int mdt_dlmreq_unpack(struct mdt_thread_info *info) {
 
 static int mdt_setattr_unpack(struct mdt_thread_info *info)
 {
+        struct mdt_reint_record *rr = &info->mti_rr;
         struct md_attr          *ma = &info->mti_attr;
         struct req_capsule      *pill = info->mti_pill;
         int rc;
@@ -840,10 +892,15 @@ static int mdt_setattr_unpack(struct mdt_thread_info *info)
         /* Epoch may be absent */
         mdt_ioepoch_unpack(info);
 
-        ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_EADATA, RCL_CLIENT);
-        if (ma->ma_lmm_size) {
-                ma->ma_lmm = req_capsule_client_get(pill, &RMF_EADATA);
-                ma->ma_valid |= MA_LOV;
+        if (req_capsule_field_present(pill, &RMF_EADATA, RCL_CLIENT)) {
+                rr->rr_eadata = req_capsule_client_get(pill, &RMF_EADATA);
+                rr->rr_eadatalen = req_capsule_get_size(pill, &RMF_EADATA,
+                                                        RCL_CLIENT);
+                ma->ma_lmm_size = rr->rr_eadatalen;
+                if (ma->ma_lmm_size > 0) {
+                        ma->ma_lmm = (void *)rr->rr_eadata;
+                        ma->ma_valid |= MA_LOV;
+                }
         }
 
         ma->ma_cookie_size = req_capsule_get_size(pill, &RMF_LOGCOOKIES,
@@ -903,7 +960,7 @@ static int mdt_create_unpack(struct mdt_thread_info *info)
         attr->la_valid = LA_MODE | LA_RDEV | LA_UID | LA_GID |
                          LA_CTIME | LA_MTIME | LA_ATIME;
         memset(&sp->u, 0, sizeof(sp->u));
-        sp->sp_cr_flags = rec->cr_flags;
+        sp->sp_cr_flags = get_mrc_cr_flags(rec);
         sp->sp_ck_split = !!(rec->cr_bias & MDS_CHECK_SPLIT);
         info->mti_cross_ref = !!(rec->cr_bias & MDS_CROSS_REF);
 
@@ -914,7 +971,8 @@ static int mdt_create_unpack(struct mdt_thread_info *info)
 
         if (!info->mti_cross_ref) {
                 rr->rr_name = req_capsule_client_get(pill, &RMF_NAME);
-                rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1;
+                rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME,
+                                                      RCL_CLIENT) - 1;
                 LASSERT(rr->rr_name && rr->rr_namelen > 0);
         } else {
                 rr->rr_name = NULL;
@@ -928,9 +986,11 @@ static int mdt_create_unpack(struct mdt_thread_info *info)
                 req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_RMT_ACL);
                 LASSERT(req_capsule_field_present(pill, &RMF_EADATA,
                                                   RCL_CLIENT));
-                sp->u.sp_ea.eadata = req_capsule_client_get(pill, &RMF_EADATA);
-                sp->u.sp_ea.eadatalen = req_capsule_get_size(pill, &RMF_EADATA,
-                                                             RCL_CLIENT);
+                rr->rr_eadata = req_capsule_client_get(pill, &RMF_EADATA);
+                rr->rr_eadatalen = req_capsule_get_size(pill, &RMF_EADATA,
+                                                        RCL_CLIENT);
+                sp->u.sp_ea.eadata = rr->rr_eadata;
+                sp->u.sp_ea.eadatalen = rr->rr_eadatalen;
                 sp->u.sp_ea.fid = rr->rr_fid1;
                 RETURN(0);
         }
@@ -945,11 +1005,13 @@ static int mdt_create_unpack(struct mdt_thread_info *info)
                        req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_SLAVE);
                        LASSERT(req_capsule_field_present(pill, &RMF_EADATA,
                                                          RCL_CLIENT));
-                       sp->u.sp_ea.eadata = req_capsule_client_get(pill,
-                                                                   &RMF_EADATA);
-                       sp->u.sp_ea.eadatalen = req_capsule_get_size(pill,
-                                                                    &RMF_EADATA,
-                                                                    RCL_CLIENT);
+                       rr->rr_eadata = req_capsule_client_get(pill,
+                                                              &RMF_EADATA);
+                       rr->rr_eadatalen = req_capsule_get_size(pill,
+                                                               &RMF_EADATA,
+                                                               RCL_CLIENT);
+                       sp->u.sp_ea.eadata = rr->rr_eadata;
+                       sp->u.sp_ea.eadatalen = rr->rr_eadatalen;
                        sp->u.sp_ea.fid = rr->rr_fid1;
                        RETURN(0);
                 }
@@ -1072,6 +1134,8 @@ static int mdt_unlink_unpack(struct mdt_thread_info *info)
                 ma->ma_attr_flags &= ~MDS_VTX_BYPASS;
 
         info->mti_spec.no_create = !!req_is_replay(mdt_info_req(info));
+        /* last unlink need LOV EA sent back */
+        rr->rr_eadatalen = info->mti_mdt->mdt_max_mdsize;
 
         rc = mdt_dlmreq_unpack(info);
         RETURN(rc);
@@ -1132,6 +1196,8 @@ static int mdt_rename_unpack(struct mdt_thread_info *info)
                 ma->ma_attr_flags &= ~MDS_VTX_BYPASS;
 
         info->mti_spec.no_create = !!req_is_replay(mdt_info_req(info));
+        /* rename may contain unlink so we might need LOV EA sent back */
+        rr->rr_eadatalen = info->mti_mdt->mdt_max_mdsize;
 
         rc = mdt_dlmreq_unpack(info);
         RETURN(rc);
@@ -1172,7 +1238,7 @@ static int mdt_open_unpack(struct mdt_thread_info *info)
         attr->la_valid = LA_MODE  | LA_RDEV  | LA_UID   | LA_GID |
                          LA_CTIME | LA_MTIME | LA_ATIME;
         memset(&info->mti_spec.u, 0, sizeof(info->mti_spec.u));
-        info->mti_spec.sp_cr_flags = rec->cr_flags;
+        info->mti_spec.sp_cr_flags = get_mrc_cr_flags(rec);
         /* Do not trigger ASSERTION if client miss to set such flags. */
         if (unlikely(info->mti_spec.sp_cr_flags == 0))
                 RETURN(-EPROTO);
@@ -1204,12 +1270,26 @@ static int mdt_open_unpack(struct mdt_thread_info *info)
                 RETURN(-EFAULT);
         rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1;
 
-        sp->u.sp_ea.eadatalen = req_capsule_get_size(pill, &RMF_EADATA,
-                                                     RCL_CLIENT);
-        if (sp->u.sp_ea.eadatalen) {
-                sp->u.sp_ea.eadata = req_capsule_client_get(pill, &RMF_EADATA);
-                sp->no_create = !!req_is_replay(req);
-        }
+        if (req_capsule_field_present(pill, &RMF_EADATA, RCL_CLIENT)) {
+                rr->rr_eadatalen = req_capsule_get_size(pill, &RMF_EADATA,
+                                                        RCL_CLIENT);
+                if (rr->rr_eadatalen > 0) {
+                        rr->rr_eadata = req_capsule_client_get(pill,
+                                                               &RMF_EADATA);
+                        sp->u.sp_ea.eadatalen = rr->rr_eadatalen;
+                        sp->u.sp_ea.eadata = rr->rr_eadata;
+                        sp->no_create = !!req_is_replay(req);
+                }
+
+                /*
+                 * Client default md_size may be 0 right after client start,
+                 * until all osc are connected, set here just some reasonable
+                 * value to prevent misbehavior.
+                 */
+                if (rr->rr_eadatalen == 0 &&
+                    !(info->mti_spec.sp_cr_flags & MDS_OPEN_DELAY_CREATE))
+                       rr->rr_eadatalen = MIN_MD_SIZE;
+       }
 
         RETURN(0);
 }
@@ -1256,11 +1336,20 @@ static int mdt_setxattr_unpack(struct mdt_thread_info *info)
         rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1;
         LASSERT(rr->rr_namelen > 0);
 
-        rr->rr_eadatalen = req_capsule_get_size(pill, &RMF_EADATA, RCL_CLIENT);
-        if (rr->rr_eadatalen > 0) {
-                rr->rr_eadata = req_capsule_client_get(pill, &RMF_EADATA);
-                if (rr->rr_eadata == NULL)
-                        RETURN(-EFAULT);
+        if (req_capsule_field_present(pill, &RMF_EADATA, RCL_CLIENT)) {
+                rr->rr_eadatalen = req_capsule_get_size(pill, &RMF_EADATA,
+                                                        RCL_CLIENT);
+                if (rr->rr_eadatalen > 0) {
+                        rr->rr_eadata = req_capsule_client_get(pill,
+                                                               &RMF_EADATA);
+                        if (rr->rr_eadata == NULL)
+                                RETURN(-EFAULT);
+                } else {
+                        rr->rr_eadata = NULL;
+                }
+        } else if (!(attr->la_valid & OBD_MD_FLXATTRRM)) {
+                CDEBUG(D_INFO, "no xattr data supplied\n");
+                RETURN(-EFAULT);
         }
 
         RETURN(0);