#include <lustre_net.h>
#include <lustre_req_layout.h>
#include <lustre_swab.h>
+#include <lustre_acl.h>
#include "mdc_internal.h"
static struct ptlrpc_request *
mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
- struct md_op_data *op_data)
+ struct md_op_data *op_data, __u32 acl_bufsize)
{
struct ptlrpc_request *req;
struct obd_device *obddev = class_exp2obd(exp);
req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
obddev->u.cli.cl_max_mds_easize);
- req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
- req->rq_import->imp_connect_data.ocd_max_easize);
+ req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
ptlrpc_request_set_replen(req);
return req;
}
RETURN(req);
}
-static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp,
- struct lookup_intent *it,
- struct md_op_data *op_data)
+static struct ptlrpc_request *
+mdc_intent_getattr_pack(struct obd_export *exp, struct lookup_intent *it,
+ struct md_op_data *op_data, __u32 acl_bufsize)
{
struct ptlrpc_request *req;
struct obd_device *obddev = class_exp2obd(exp);
mdc_getattr_pack(req, valid, it->it_flags, op_data, easize);
req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER, easize);
- req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
- req->rq_import->imp_connect_data.ocd_max_easize);
+ req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
ptlrpc_request_set_replen(req);
RETURN(req);
}
.l_inodebits = { MDS_INODELOCK_XATTR } };
int generation, resends = 0;
struct ldlm_reply *lockrep;
+ struct obd_import *imp = class_exp2cliimp(exp);
+ __u32 acl_bufsize;
enum lvb_type lvb_type = 0;
int rc;
ENTRY;
policy = &lookup_policy;
}
- generation = obddev->u.cli.cl_import->imp_generation;
+ generation = obddev->u.cli.cl_import->imp_generation;
+ if (!it || (it->it_op & (IT_CREAT | IT_OPEN_CREAT)))
+ acl_bufsize = imp->imp_connect_data.ocd_max_easize;
+ else
+ acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
+
resend:
- flags = saved_flags;
+ flags = saved_flags;
if (it == NULL) {
/* The only way right now is FLOCK. */
LASSERTF(einfo->ei_type == LDLM_FLOCK, "lock type %d\n",
einfo->ei_type);
res_id.name[3] = LDLM_FLOCK;
} else if (it->it_op & IT_OPEN) {
- req = mdc_intent_open_pack(exp, it, op_data);
+ req = mdc_intent_open_pack(exp, it, op_data, acl_bufsize);
} else if (it->it_op & IT_UNLINK) {
req = mdc_intent_unlink_pack(exp, it, op_data);
} else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) {
- req = mdc_intent_getattr_pack(exp, it, op_data);
+ req = mdc_intent_getattr_pack(exp, it, op_data, acl_bufsize);
} else if (it->it_op & IT_READDIR) {
req = mdc_enqueue_pack(exp, 0);
} else if (it->it_op & IT_LAYOUT) {
- if (!imp_connect_lvb_type(class_exp2cliimp(exp)))
+ if (!imp_connect_lvb_type(imp))
RETURN(-EOPNOTSUPP);
req = mdc_intent_layout_pack(exp, it, op_data);
lvb_type = LVB_T_LAYOUT;
}
}
+ if ((int)lockrep->lock_policy_res2 == -ERANGE &&
+ it->it_op & (IT_OPEN | IT_GETATTR | IT_LOOKUP) &&
+ acl_bufsize != imp->imp_connect_data.ocd_max_easize) {
+ mdc_clear_replay_flag(req, -ERANGE);
+ ptlrpc_req_finished(req);
+ acl_bufsize = imp->imp_connect_data.ocd_max_easize;
+ goto resend;
+ }
+
rc = mdc_finish_enqueue(exp, req, einfo, it, lockh, rc);
if (rc < 0) {
if (lustre_handle_is_used(lockh)) {
PFID(&op_data->op_fid1), ldlm_it2str(it->it_op), it->it_flags);
fid_build_reg_res_name(&op_data->op_fid1, &res_id);
- req = mdc_intent_getattr_pack(exp, it, op_data);
+ /* If the MDT return -ERANGE because of large ACL, then the sponsor
+ * of the async getattr RPC will handle that by itself. */
+ req = mdc_intent_getattr_pack(exp, it, op_data,
+ LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
if (IS_ERR(req))
RETURN(PTR_ERR(req));
RETURN(0);
}
+static void mdc_reset_acl_req(struct ptlrpc_request *req)
+{
+ spin_lock(&req->rq_early_free_lock);
+ sptlrpc_cli_free_repbuf(req);
+ req->rq_repbuf = NULL;
+ req->rq_repbuf_len = 0;
+ req->rq_repdata = NULL;
+ req->rq_reqdata_len = 0;
+ spin_unlock(&req->rq_early_free_lock);
+}
+
static int mdc_getattr(struct obd_export *exp, struct md_op_data *op_data,
struct ptlrpc_request **request)
{
- struct ptlrpc_request *req;
- int rc;
- ENTRY;
+ struct ptlrpc_request *req;
+ struct obd_import *imp = class_exp2cliimp(exp);
+ __u32 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
+ int rc;
+ ENTRY;
/* Single MDS without an LMV case */
if (op_data->op_flags & MF_GET_MDT_IDX) {
op_data->op_mds = 0;
RETURN(0);
}
- *request = NULL;
- req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_MDS_GETATTR);
+
+ *request = NULL;
+ req = ptlrpc_request_alloc(imp, &RQF_MDS_GETATTR);
if (req == NULL)
RETURN(-ENOMEM);
RETURN(rc);
}
+again:
mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
op_data->op_mode, -1, 0);
+ req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
+ req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
+ op_data->op_mode);
+ ptlrpc_request_set_replen(req);
- req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
- req->rq_import->imp_connect_data.ocd_max_easize);
- req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
- op_data->op_mode);
- ptlrpc_request_set_replen(req);
+ rc = mdc_getattr_common(exp, req);
+ if (rc) {
+ if (rc == -ERANGE &&
+ acl_bufsize != imp->imp_connect_data.ocd_max_easize) {
+ acl_bufsize = imp->imp_connect_data.ocd_max_easize;
+ mdc_reset_acl_req(req);
+ goto again;
+ }
- rc = mdc_getattr_common(exp, req);
- if (rc)
- ptlrpc_req_finished(req);
- else
- *request = req;
- RETURN(rc);
+ ptlrpc_req_finished(req);
+ } else {
+ *request = req;
+ }
+
+ RETURN(rc);
}
static int mdc_getattr_name(struct obd_export *exp, struct md_op_data *op_data,
struct ptlrpc_request **request)
{
- struct ptlrpc_request *req;
- int rc;
- ENTRY;
+ struct ptlrpc_request *req;
+ struct obd_import *imp = class_exp2cliimp(exp);
+ __u32 acl_bufsize = LUSTRE_POSIX_ACL_MAX_SIZE_OLD;
+ int rc;
+ ENTRY;
- *request = NULL;
- req = ptlrpc_request_alloc(class_exp2cliimp(exp),
- &RQF_MDS_GETATTR_NAME);
+ *request = NULL;
+ req = ptlrpc_request_alloc(imp, &RQF_MDS_GETATTR_NAME);
if (req == NULL)
RETURN(-ENOMEM);
RETURN(rc);
}
- mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
- op_data->op_mode, op_data->op_suppgids[0], 0);
-
if (op_data->op_name) {
char *name = req_capsule_client_get(&req->rq_pill, &RMF_NAME);
LASSERT(strnlen(op_data->op_name, op_data->op_namelen) ==
memcpy(name, op_data->op_name, op_data->op_namelen);
}
- req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
- op_data->op_mode);
- req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER,
- req->rq_import->imp_connect_data.ocd_max_easize);
- ptlrpc_request_set_replen(req);
+again:
+ mdc_pack_body(req, &op_data->op_fid1, op_data->op_valid,
+ op_data->op_mode, op_data->op_suppgids[0], 0);
+ req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
+ op_data->op_mode);
+ req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
+ ptlrpc_request_set_replen(req);
- rc = mdc_getattr_common(exp, req);
- if (rc)
- ptlrpc_req_finished(req);
- else
- *request = req;
- RETURN(rc);
+ rc = mdc_getattr_common(exp, req);
+ if (rc) {
+ if (rc == -ERANGE &&
+ acl_bufsize != imp->imp_connect_data.ocd_max_easize) {
+ acl_bufsize = imp->imp_connect_data.ocd_max_easize;
+ mdc_reset_acl_req(req);
+ goto again;
+ }
+
+ ptlrpc_req_finished(req);
+ } else {
+ *request = req;
+ }
+
+ RETURN(rc);
}
static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt,
struct md_object *next = mdt_object_child(o);
struct lu_buf *buf = &info->mti_buf;
struct mdt_device *mdt = info->mti_mdt;
+ struct req_capsule *pill = info->mti_pill;
int rc;
ENTRY;
- buf->lb_buf = req_capsule_server_get(info->mti_pill, &RMF_ACL);
- buf->lb_len = req_capsule_get_size(info->mti_pill, &RMF_ACL,
- RCL_SERVER);
+ buf->lb_buf = req_capsule_server_get(pill, &RMF_ACL);
+ buf->lb_len = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER);
if (buf->lb_len == 0)
RETURN(0);
mdt_obd_name(mdt), PFID(mdt_object_fid(o)), rc);
}
} else {
+ int client;
+ int server;
+ int acl_buflen;
+ int lmm_buflen = 0;
+ int lmmsize = 0;
+
+ acl_buflen = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER);
+ if (acl_buflen >= rc)
+ goto map;
+
+ /* If LOV/LMA EA is small, we can reuse part of their buffer */
+ client = ptlrpc_req_get_repsize(pill->rc_req);
+ server = lustre_packed_msg_size(pill->rc_req->rq_repmsg);
+ if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) {
+ lmm_buflen = req_capsule_get_size(pill, &RMF_MDT_MD,
+ RCL_SERVER);
+ lmmsize = repbody->mbo_eadatasize;
+ }
+
+ if (client < server - acl_buflen - lmm_buflen + rc + lmmsize) {
+ CDEBUG(D_INODE, "%s: client prepared buffer size %d "
+ "is not big enough with the ACL size %d (%d)\n",
+ mdt_obd_name(mdt), client, rc,
+ server - acl_buflen - lmm_buflen + rc + lmmsize);
+ repbody->mbo_aclsize = 0;
+ repbody->mbo_valid &= ~OBD_MD_FLACL;
+ RETURN(-ERANGE);
+ }
+
+map:
if (buf->lb_buf == info->mti_big_acl)
info->mti_big_acl_used = 1;
CERROR("%s: nodemap_map_acl unable to parse "DFID
" ACL: rc = %d\n", mdt_obd_name(mdt),
PFID(mdt_object_fid(o)), rc);
+ repbody->mbo_aclsize = 0;
+ repbody->mbo_valid &= ~OBD_MD_FLACL;
} else {
repbody->mbo_aclsize = rc;
repbody->mbo_valid |= OBD_MD_FLACL;
return 0;
}
}
+EXPORT_SYMBOL(lustre_packed_msg_size);
void lustre_init_msg_v2(struct lustre_msg_v2 *msg, int count, __u32 *lens,
char **bufs)
req->rq_repmsg = NULL;
EXIT;
}
+EXPORT_SYMBOL(sptlrpc_cli_free_repbuf);
int sptlrpc_cli_install_rvs_ctx(struct obd_import *imp,
struct ptlrpc_cli_ctx *ctx)