Whamcloud - gitweb
LU-10513 acl: prepare small buffer for ACL RPC reply 16/28116/11
authorFan Yong <fan.yong@intel.com>
Mon, 15 Jan 2018 14:45:37 +0000 (22:45 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Tue, 6 Feb 2018 04:26:49 +0000 (04:26 +0000)
For most of files, their ACL entries are very limited, under
such case, it is unnecessary to prepare very large reply buffer
to hold unknown-sized ACL entries for the getattr/open RPCs.
Instead, we can prepare some relative small buffer, such as the
LUSTRE_POSIX_ACL_MAX_SIZE_OLD (260) bytes, that is equal to the
ACL size before patch 64b2fad22a4eb4727315709e014d8f74c5a7f289.
If the target file has too many ACL entries and exceeds the
prepared reply buffer, then the MDT will reply -ERANGE failure
to the client, and then the client can prepare more large buffer
and try again. Since the file with large ACL is rare case, such
retrying getattr/open RPCs will not affect the real performance
too much.

The advantage is that it reduces the client side RAM pressure.

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: I4c01b19520cab1cc712e36f3b0225973fba00410
Reviewed-on: https://review.whamcloud.com/28116
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/mdc/mdc_locks.c
lustre/mdc/mdc_request.c
lustre/mdt/mdt_handler.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/sec.c

index d5f054c..2c151df 100644 (file)
@@ -43,6 +43,7 @@
 #include <lustre_net.h>
 #include <lustre_req_layout.h>
 #include <lustre_swab.h>
+#include <lustre_acl.h>
 
 #include "mdc_internal.h"
 
@@ -244,7 +245,7 @@ int mdc_save_lovea(struct ptlrpc_request *req,
 
 static struct ptlrpc_request *
 mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
-                    struct md_op_data *op_data)
+                    struct md_op_data *op_data, __u32 acl_bufsize)
 {
        struct ptlrpc_request   *req;
        struct obd_device       *obddev = class_exp2obd(exp);
@@ -336,8 +337,7 @@ mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
 
        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
                             obddev->u.cli.cl_max_mds_easize);
-       req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
-                            req->rq_import->imp_connect_data.ocd_max_easize);
+       req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
         ptlrpc_request_set_replen(req);
         return req;
 }
@@ -431,9 +431,9 @@ static struct ptlrpc_request *mdc_intent_unlink_pack(struct obd_export *exp,
        RETURN(req);
 }
 
-static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
-                                                      struct lookup_intent *it,
-                                                      struct md_op_data *op_data)
+static struct ptlrpc_request *
+mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
+                       struct md_op_data *op_data, __u32 acl_bufsize)
 {
        struct ptlrpc_request   *req;
        struct obd_device       *obddev = class_exp2obd(exp);
@@ -472,8 +472,7 @@ static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
        mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
 
        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
-       req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
-                            req->rq_import->imp_connect_data.ocd_max_easize);
+       req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
        ptlrpc_request_set_replen(req);
        RETURN(req);
 }
@@ -768,6 +767,8 @@ static int mdc_enqueue_base(struct obd_export *exp,
                                  .l_inodebits = { MDS_INODELOCK_XATTR } };
        int generation, resends = 0;
        struct ldlm_reply *lockrep;
+       struct obd_import *imp = class_exp2cliimp(exp);
+       __u32 acl_bufsize;
        enum lvb_type lvb_type = 0;
        int rc;
        ENTRY;
@@ -790,24 +791,29 @@ static int mdc_enqueue_base(struct obd_export *exp,
                        policy = &lookup_policy;
        }
 
-        generation = obddev->u.cli.cl_import->imp_generation;
+       generation = obddev->u.cli.cl_import->imp_generation;
+       if (!it || (it->it_op & (IT_CREAT | IT_OPEN_CREAT)))
+               acl_bufsize = imp->imp_connect_data.ocd_max_easize;
+       else
+               acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
+
 resend:
-        flags = saved_flags;
+       flags = saved_flags;
        if (it == NULL) {
                /* The only way right now is FLOCK. */
                LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
                         einfo->ei_type);
                res_id.name[3] = LDLM_FLOCK;
        } else if (it->it_op & IT_OPEN) {
-               req = mdc_intent_open_pack(exp, it, op_data);
+               req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
        } else if (it->it_op & IT_UNLINK) {
                req = mdc_intent_unlink_pack(exp, it, op_data);
        } else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
-               req = mdc_intent_getattr_pack(exp, it, op_data);
+               req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
        } else if (it->it_op & IT_READDIR) {
                req = mdc_enqueue_pack(exp, 0);
        } else if (it->it_op & IT_LAYOUT) {
-               if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
+               if (!imp_connect_lvb_type(imp))
                        RETURN(-EOPNOTSUPP);
                req = mdc_intent_layout_pack(exp, it, op_data);
                lvb_type = LVB_T_LAYOUT;
@@ -909,6 +915,15 @@ resend:
                }
        }
 
+       if ((int)lockrep->lock_policy_res2 == -ERANGE &&
+           it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
+           acl_bufsize != imp->imp_connect_data.ocd_max_easize) {
+               mdc_clear_replay_flag(req, -ERANGE);
+               ptlrpc_req_finished(req);
+               acl_bufsize = imp->imp_connect_data.ocd_max_easize;
+               goto resend;
+       }
+
        rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
        if (rc < 0) {
                if (lustre_handle_is_used(lockh)) {
@@ -1266,7 +1281,10 @@ int mdc_intent_getattr_async(struct obd_export *exp,
                PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
 
        fid_build_reg_res_name(&op_data->op_fid1, &res_id);
-       req = mdc_intent_getattr_pack(exp, it, op_data);
+       /* If the MDT return -ERANGE because of large ACL, then the sponsor
+        * of the async getattr RPC will handle that by itself. */
+       req = mdc_intent_getattr_pack(exp, it, op_data,
+                                     LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
        if (IS_ERR(req))
                RETURN(PTR_ERR(req));
 
index b21c2b2..6dd1a06 100644 (file)
@@ -192,20 +192,34 @@ static int mdc_getattr_common(struct obd_export *exp,
         RETURN(0);
 }
 
+static void mdc_reset_acl_req(struct ptlrpc_request *req)
+{
+       spin_lock(&req->rq_early_free_lock);
+       sptlrpc_cli_free_repbuf(req);
+       req->rq_repbuf = NULL;
+       req->rq_repbuf_len = 0;
+       req->rq_repdata = NULL;
+       req->rq_reqdata_len = 0;
+       spin_unlock(&req->rq_early_free_lock);
+}
+
 static int mdc_getattr(struct obd_export *exp, struct md_op_data *op_data,
                       struct ptlrpc_request **request)
 {
-        struct ptlrpc_request *req;
-        int                    rc;
-        ENTRY;
+       struct ptlrpc_request *req;
+       struct obd_import *imp = class_exp2cliimp(exp);
+       __u32 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
+       int rc;
+       ENTRY;
 
        /* Single MDS without an LMV case */
        if (op_data->op_flags & MF_GET_MDT_IDX) {
                op_data->op_mds = 0;
                RETURN(0);
        }
-        *request = NULL;
-        req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_GETATTR);
+
+       *request = NULL;
+       req = ptlrpc_request_alloc(imp, &RQF_MDS_GETATTR);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
@@ -215,33 +229,42 @@ static int mdc_getattr(struct obd_export *exp, struct md_op_data *op_data,
                 RETURN(rc);
         }
 
+again:
        mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
                      op_data->op_mode, -1, 0);
+       req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
+       req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
+                            op_data->op_mode);
+       ptlrpc_request_set_replen(req);
 
-       req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
-                            req->rq_import->imp_connect_data.ocd_max_easize);
-        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
-                             op_data->op_mode);
-        ptlrpc_request_set_replen(req);
+       rc = mdc_getattr_common(exp, req);
+       if (rc) {
+               if (rc == -ERANGE &&
+                   acl_bufsize != imp->imp_connect_data.ocd_max_easize) {
+                       acl_bufsize = imp->imp_connect_data.ocd_max_easize;
+                       mdc_reset_acl_req(req);
+                       goto again;
+               }
 
-        rc = mdc_getattr_common(exp, req);
-        if (rc)
-                ptlrpc_req_finished(req);
-        else
-                *request = req;
-        RETURN(rc);
+               ptlrpc_req_finished(req);
+       } else {
+               *request = req;
+       }
+
+       RETURN(rc);
 }
 
 static int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data,
                            struct ptlrpc_request **request)
 {
-        struct ptlrpc_request *req;
-        int                    rc;
-        ENTRY;
+       struct ptlrpc_request *req;
+       struct obd_import *imp = class_exp2cliimp(exp);
+       __u32 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
+       int rc;
+       ENTRY;
 
-        *request = NULL;
-        req = ptlrpc_request_alloc(class_exp2cliimp(exp),
-                                   &RQF_MDS_GETATTR_NAME);
+       *request = NULL;
+       req = ptlrpc_request_alloc(imp, &RQF_MDS_GETATTR_NAME);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
@@ -254,9 +277,6 @@ static int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data,
                 RETURN(rc);
         }
 
-       mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
-                     op_data->op_mode, op_data->op_suppgids[0], 0);
-
         if (op_data->op_name) {
                 char *name = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
                 LASSERT(strnlen(op_data->op_name, op_data->op_namelen) ==
@@ -264,18 +284,29 @@ static int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data,
                 memcpy(name, op_data->op_name, op_data->op_namelen);
         }
 
-        req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
-                             op_data->op_mode);
-       req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
-                            req->rq_import->imp_connect_data.ocd_max_easize);
-        ptlrpc_request_set_replen(req);
+again:
+       mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
+                     op_data->op_mode, op_data->op_suppgids[0], 0);
+       req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
+                            op_data->op_mode);
+       req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
+       ptlrpc_request_set_replen(req);
 
-        rc = mdc_getattr_common(exp, req);
-        if (rc)
-                ptlrpc_req_finished(req);
-        else
-                *request = req;
-        RETURN(rc);
+       rc = mdc_getattr_common(exp, req);
+       if (rc) {
+               if (rc == -ERANGE &&
+                   acl_bufsize != imp->imp_connect_data.ocd_max_easize) {
+                       acl_bufsize = imp->imp_connect_data.ocd_max_easize;
+                       mdc_reset_acl_req(req);
+                       goto again;
+               }
+
+               ptlrpc_req_finished(req);
+       } else {
+               *request = req;
+       }
+
+       RETURN(rc);
 }
 
 static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt,
index 2bac82f..826dd04 100644 (file)
@@ -539,13 +539,13 @@ int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody,
        struct md_object        *next = mdt_object_child(o);
        struct lu_buf           *buf = &info->mti_buf;
        struct mdt_device       *mdt = info->mti_mdt;
+       struct req_capsule *pill = info->mti_pill;
        int rc;
 
        ENTRY;
 
-       buf->lb_buf = req_capsule_server_get(info->mti_pill, &RMF_ACL);
-       buf->lb_len = req_capsule_get_size(info->mti_pill, &RMF_ACL,
-                                          RCL_SERVER);
+       buf->lb_buf = req_capsule_server_get(pill, &RMF_ACL);
+       buf->lb_len = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER);
        if (buf->lb_len == 0)
                RETURN(0);
 
@@ -593,6 +593,36 @@ again:
                               mdt_obd_name(mdt), PFID(mdt_object_fid(o)), rc);
                }
        } else {
+               int client;
+               int server;
+               int acl_buflen;
+               int lmm_buflen = 0;
+               int lmmsize = 0;
+
+               acl_buflen = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER);
+               if (acl_buflen >= rc)
+                       goto map;
+
+               /* If LOV/LMA EA is small, we can reuse part of their buffer */
+               client = ptlrpc_req_get_repsize(pill->rc_req);
+               server = lustre_packed_msg_size(pill->rc_req->rq_repmsg);
+               if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) {
+                       lmm_buflen = req_capsule_get_size(pill, &RMF_MDT_MD,
+                                                         RCL_SERVER);
+                       lmmsize = repbody->mbo_eadatasize;
+               }
+
+               if (client < server - acl_buflen - lmm_buflen + rc + lmmsize) {
+                       CDEBUG(D_INODE, "%s: client prepared buffer size %d "
+                              "is not big enough with the ACL size %d (%d)\n",
+                              mdt_obd_name(mdt), client, rc,
+                              server - acl_buflen - lmm_buflen + rc + lmmsize);
+                       repbody->mbo_aclsize = 0;
+                       repbody->mbo_valid &= ~OBD_MD_FLACL;
+                       RETURN(-ERANGE);
+               }
+
+map:
                if (buf->lb_buf == info->mti_big_acl)
                        info->mti_big_acl_used = 1;
 
@@ -603,6 +633,8 @@ again:
                        CERROR("%s: nodemap_map_acl unable to parse "DFID
                               " ACL: rc = %d\n", mdt_obd_name(mdt),
                               PFID(mdt_object_fid(o)), rc);
+                       repbody->mbo_aclsize = 0;
+                       repbody->mbo_valid &= ~OBD_MD_FLACL;
                } else {
                        repbody->mbo_aclsize = rc;
                        repbody->mbo_valid |= OBD_MD_FLACL;
index ce07f42..4781872 100644 (file)
@@ -185,6 +185,7 @@ __u32 lustre_packed_msg_size(struct lustre_msg *msg)
                 return 0;
         }
 }
+EXPORT_SYMBOL(lustre_packed_msg_size);
 
 void lustre_init_msg_v2(struct lustre_msg_v2 *msg, int count, __u32 *lens,
                         char **bufs)
index e693d0e..5728e5e 100644 (file)
@@ -1726,6 +1726,7 @@ void sptlrpc_cli_free_repbuf(struct ptlrpc_request *req)
         req->rq_repmsg = NULL;
         EXIT;
 }
+EXPORT_SYMBOL(sptlrpc_cli_free_repbuf);
 
 int sptlrpc_cli_install_rvs_ctx(struct obd_import *imp,
                                 struct ptlrpc_cli_ctx *ctx)