* Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2012, 2013, Intel Corporation.
+ * Copyright (c) 2012, 2014, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#define DEBUG_SUBSYSTEM S_MDC
-#ifdef __KERNEL__
-# include <linux/module.h>
-# include <linux/kernel.h>
-#else
-# include <liblustre.h>
-#endif
+#include <linux/module.h>
+#include <linux/kernel.h>
#include <obd_class.h>
#include "mdc_internal.h"
#include <lustre_fid.h>
/* mdc_setattr does its own semaphore handling */
-static int mdc_reint(struct ptlrpc_request *request,
- struct mdc_rpc_lock *rpc_lock,
- int level)
+static int mdc_reint(struct ptlrpc_request *request, int level)
{
int rc;
request->rq_send_state = level;
- mdc_get_rpc_lock(rpc_lock, NULL);
- rc = ptlrpc_queue_wait(request);
- mdc_put_rpc_lock(rpc_lock, NULL);
+ mdc_get_mod_rpc_slot(request, NULL);
+ rc = ptlrpc_queue_wait(request);
+ mdc_put_mod_rpc_slot(request, NULL);
if (rc)
CDEBUG(D_INFO, "error in handling %d\n", rc);
else if (!req_capsule_server_get(&request->rq_pill, &RMF_MDT_BODY)) {
* found by @fid. Found locks are added into @cancel list. Returns the amount of
* locks added to @cancels list. */
int mdc_resource_get_unused(struct obd_export *exp, const struct lu_fid *fid,
- cfs_list_t *cancels, ldlm_mode_t mode,
+ struct list_head *cancels, ldlm_mode_t mode,
__u64 bits)
{
struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
- ldlm_policy_data_t policy = {{0}};
- struct ldlm_res_id res_id;
- struct ldlm_resource *res;
- int count;
- ENTRY;
+ ldlm_policy_data_t policy = { {0} };
+ struct ldlm_res_id res_id;
+ struct ldlm_resource *res;
+ int count;
+ ENTRY;
/* Return, i.e. cancel nothing, only if ELC is supported (flag in
* export) but disabled through procfs (flag in NS).
if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
RETURN(0);
- fid_build_reg_res_name(fid, &res_id);
- res = ldlm_resource_get(exp->exp_obd->obd_namespace,
- NULL, &res_id, 0, 0);
- if (res == NULL)
- RETURN(0);
- LDLM_RESOURCE_ADDREF(res);
- /* Initialize ibits lock policy. */
- policy.l_inodebits.bits = bits;
- count = ldlm_cancel_resource_local(res, cancels, &policy,
- mode, 0, 0, NULL);
- LDLM_RESOURCE_DELREF(res);
- ldlm_resource_putref(res);
- RETURN(count);
+ fid_build_reg_res_name(fid, &res_id);
+ res = ldlm_resource_get(exp->exp_obd->obd_namespace,
+ NULL, &res_id, 0, 0);
+ if (IS_ERR(res))
+ RETURN(0);
+ LDLM_RESOURCE_ADDREF(res);
+ /* Initialize ibits lock policy. */
+ policy.l_inodebits.bits = bits;
+ count = ldlm_cancel_resource_local(res, cancels, &policy,
+ mode, 0, 0, NULL);
+ LDLM_RESOURCE_DELREF(res);
+ ldlm_resource_putref(res);
+ RETURN(count);
}
int mdc_setattr(struct obd_export *exp, struct md_op_data *op_data,
- void *ea, int ealen, void *ea2, int ea2len,
- struct ptlrpc_request **request, struct md_open_data **mod)
+ void *ea, size_t ealen, struct ptlrpc_request **request)
{
- CFS_LIST_HEAD(cancels);
+ struct list_head cancels = LIST_HEAD_INIT(cancels);
struct ptlrpc_request *req;
- struct mdc_rpc_lock *rpc_lock;
- struct obd_device *obd = exp->exp_obd;
int count = 0, rc;
__u64 bits;
ENTRY;
if (op_data->op_attr.ia_valid & (ATTR_MODE|ATTR_UID|ATTR_GID))
bits |= MDS_INODELOCK_LOOKUP;
if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
- (fid_is_sane(&op_data->op_fid1)) &&
- !OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
+ (fid_is_sane(&op_data->op_fid1)))
count = mdc_resource_get_unused(exp, &op_data->op_fid1,
&cancels, LCK_EX, bits);
req = ptlrpc_request_alloc(class_exp2cliimp(exp),
ldlm_lock_list_put(&cancels, l_bl_ast, count);
RETURN(-ENOMEM);
}
- mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
- if ((op_data->op_flags & (MF_SOM_CHANGE | MF_EPOCH_OPEN)) == 0)
- req_capsule_set_size(&req->rq_pill, &RMF_MDT_EPOCH, RCL_CLIENT,
- 0);
- req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, ealen);
- req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_CLIENT,
- ea2len);
+
+ req_capsule_set_size(&req->rq_pill, &RMF_MDT_EPOCH, RCL_CLIENT, 0);
+ req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT, ealen);
+ req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_CLIENT, 0);
rc = mdc_prep_elc_req(exp, req, MDS_REINT, &cancels, count);
if (rc) {
RETURN(rc);
}
- rpc_lock = obd->u.cli.cl_rpc_lock;
-
if (op_data->op_attr.ia_valid & (ATTR_MTIME | ATTR_CTIME))
CDEBUG(D_INODE, "setting mtime "CFS_TIME_T
", ctime "CFS_TIME_T"\n",
LTIME_S(op_data->op_attr.ia_mtime),
LTIME_S(op_data->op_attr.ia_ctime));
- mdc_setattr_pack(req, op_data, ea, ealen, ea2, ea2len);
+ mdc_setattr_pack(req, op_data, ea, ealen);
ptlrpc_request_set_replen(req);
- if (mod && (op_data->op_flags & MF_EPOCH_OPEN) &&
- req->rq_import->imp_replayable)
- {
- LASSERT(*mod == NULL);
-
- *mod = obd_mod_alloc();
- if (*mod == NULL) {
- DEBUG_REQ(D_ERROR, req, "Can't allocate "
- "md_open_data");
- } else {
- req->rq_replay = 1;
- req->rq_cb_data = *mod;
- (*mod)->mod_open_req = req;
- req->rq_commit_cb = mdc_commit_open;
- /**
- * Take an extra reference on \var mod, it protects \var
- * mod from being freed on eviction (commit callback is
- * called despite rq_replay flag).
- * Will be put on mdc_done_writing().
- */
- obd_mod_get(*mod);
- }
- }
- rc = mdc_reint(req, rpc_lock, LUSTRE_IMP_FULL);
-
- /* Save the obtained info in the original RPC for the replay case. */
- if (rc == 0 && (op_data->op_flags & MF_EPOCH_OPEN)) {
- struct mdt_ioepoch *epoch;
- struct mdt_body *body;
-
- epoch = req_capsule_client_get(&req->rq_pill, &RMF_MDT_EPOCH);
- body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
- LASSERT(epoch != NULL);
- LASSERT(body != NULL);
- epoch->handle = body->handle;
- epoch->ioepoch = body->ioepoch;
- req->rq_replay_cb = mdc_replay_open;
- /** bug 3633, open may be committed and estale answer is not error */
- } else if (rc == -ESTALE && (op_data->op_flags & MF_SOM_CHANGE)) {
- rc = 0;
- } else if (rc == -ERESTARTSYS) {
+ rc = mdc_reint(req, LUSTRE_IMP_FULL);
+ if (rc == -ERESTARTSYS)
rc = 0;
- }
+
*request = req;
- if (rc && req->rq_commit_cb) {
- /* Put an extra reference on \var mod on error case. */
- obd_mod_put(*mod);
- req->rq_commit_cb(req);
- }
- RETURN(rc);
+
+ RETURN(rc);
}
int mdc_create(struct obd_export *exp, struct md_op_data *op_data,
- const void *data, int datalen, int mode, __u32 uid, __u32 gid,
- cfs_cap_t cap_effective, __u64 rdev,
- struct ptlrpc_request **request)
+ const void *data, size_t datalen,
+ umode_t mode, uid_t uid, gid_t gid,
+ cfs_cap_t cap_effective, __u64 rdev,
+ struct ptlrpc_request **request)
{
struct ptlrpc_request *req;
int level, rc;
int count, resends = 0;
struct obd_import *import = exp->exp_obd->u.cli.cl_import;
int generation = import->imp_generation;
- CFS_LIST_HEAD(cancels);
+ struct list_head cancels = LIST_HEAD_INIT(cancels);
ENTRY;
- /* For case if upper layer did not alloc fid, do it now. */
- if (!fid_is_sane(&op_data->op_fid2)) {
- /*
- * mdc_fid_alloc() may return errno 1 in case of switch to new
- * sequence, handle this.
- */
- rc = mdc_fid_alloc(exp, &op_data->op_fid2, op_data);
- if (rc < 0) {
- CERROR("Can't alloc new fid, rc %d\n", rc);
- RETURN(rc);
- }
- }
+ /* For case if upper layer did not alloc fid, do it now. */
+ if (!fid_is_sane(&op_data->op_fid2)) {
+ /*
+ * mdc_fid_alloc() may return errno 1 in case of switch to new
+ * sequence, handle this.
+ */
+ rc = mdc_fid_alloc(NULL, exp, &op_data->op_fid2, op_data);
+ if (rc < 0)
+ RETURN(rc);
+ }
rebuild:
count = 0;
ldlm_lock_list_put(&cancels, l_bl_ast, count);
RETURN(-ENOMEM);
}
- mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
+
req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
op_data->op_namelen + 1);
req_capsule_set_size(&req->rq_pill, &RMF_EADATA, RCL_CLIENT,
}
level = LUSTRE_IMP_FULL;
resend:
- rc = mdc_reint(req, exp->exp_obd->u.cli.cl_rpc_lock, level);
+ rc = mdc_reint(req, level);
/* Resend if we were told to. */
if (rc == -ERESTARTSYS) {
CDEBUG(D_HA, "resend cross eviction\n");
RETURN(-EIO);
}
- } else if (rc == 0) {
- struct mdt_body *body;
- struct lustre_capa *capa;
-
- body = req_capsule_server_get(&req->rq_pill, &RMF_MDT_BODY);
- LASSERT(body);
- if (body->valid & OBD_MD_FLMDSCAPA) {
- capa = req_capsule_server_get(&req->rq_pill,
- &RMF_CAPA1);
- if (capa == NULL)
- rc = -EPROTO;
- }
}
*request = req;
int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data,
struct ptlrpc_request **request)
{
- CFS_LIST_HEAD(cancels);
+ struct list_head cancels = LIST_HEAD_INIT(cancels);
struct obd_device *obd = class_exp2obd(exp);
struct ptlrpc_request *req = *request;
int count = 0, rc;
LASSERT(req == NULL);
if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
- (fid_is_sane(&op_data->op_fid1)) &&
- !OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
+ (fid_is_sane(&op_data->op_fid1)))
count = mdc_resource_get_unused(exp, &op_data->op_fid1,
&cancels, LCK_EX,
MDS_INODELOCK_UPDATE);
if ((op_data->op_flags & MF_MDC_CANCEL_FID3) &&
- (fid_is_sane(&op_data->op_fid3)) &&
- !OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET))
+ (fid_is_sane(&op_data->op_fid3)))
count += mdc_resource_get_unused(exp, &op_data->op_fid3,
&cancels, LCK_EX,
MDS_INODELOCK_FULL);
ldlm_lock_list_put(&cancels, l_bl_ast, count);
RETURN(-ENOMEM);
}
- mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
+
req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
op_data->op_namelen + 1);
RETURN(rc);
}
- mdc_unlink_pack(req, op_data);
+ mdc_unlink_pack(req, op_data);
- req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
- obd->u.cli.cl_max_mds_easize);
- req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER,
- obd->u.cli.cl_max_mds_cookiesize);
- ptlrpc_request_set_replen(req);
+ req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
+ obd->u.cli.cl_default_mds_easize);
+ ptlrpc_request_set_replen(req);
*request = req;
- rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
+ rc = mdc_reint(req, LUSTRE_IMP_FULL);
if (rc == -ERESTARTSYS)
rc = 0;
RETURN(rc);
int mdc_link(struct obd_export *exp, struct md_op_data *op_data,
struct ptlrpc_request **request)
{
- CFS_LIST_HEAD(cancels);
- struct obd_device *obd = exp->exp_obd;
+ struct list_head cancels = LIST_HEAD_INIT(cancels);
struct ptlrpc_request *req;
int count = 0, rc;
ENTRY;
ldlm_lock_list_put(&cancels, l_bl_ast, count);
RETURN(-ENOMEM);
}
- mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
- mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa2);
+
req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT,
op_data->op_namelen + 1);
mdc_link_pack(req, op_data);
ptlrpc_request_set_replen(req);
- rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
+ rc = mdc_reint(req, LUSTRE_IMP_FULL);
*request = req;
if (rc == -ERESTARTSYS)
rc = 0;
}
int mdc_rename(struct obd_export *exp, struct md_op_data *op_data,
- const char *old, int oldlen, const char *new, int newlen,
- struct ptlrpc_request **request)
+ const char *old, size_t oldlen, const char *new, size_t newlen,
+ struct ptlrpc_request **request)
{
- CFS_LIST_HEAD(cancels);
+ struct list_head cancels = LIST_HEAD_INIT(cancels);
struct obd_device *obd = exp->exp_obd;
struct ptlrpc_request *req;
int count = 0, rc;
RETURN(-ENOMEM);
}
- mdc_set_capa_size(req, &RMF_CAPA1, op_data->op_capa1);
- mdc_set_capa_size(req, &RMF_CAPA2, op_data->op_capa2);
req_capsule_set_size(&req->rq_pill, &RMF_NAME, RCL_CLIENT, oldlen + 1);
req_capsule_set_size(&req->rq_pill, &RMF_SYMTGT, RCL_CLIENT, newlen+1);
mdc_rename_pack(req, op_data, old, oldlen, new, newlen);
- req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
- obd->u.cli.cl_max_mds_easize);
- req_capsule_set_size(&req->rq_pill, &RMF_LOGCOOKIES, RCL_SERVER,
- obd->u.cli.cl_max_mds_cookiesize);
- ptlrpc_request_set_replen(req);
+ req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
+ obd->u.cli.cl_default_mds_easize);
+ ptlrpc_request_set_replen(req);
- rc = mdc_reint(req, obd->u.cli.cl_rpc_lock, LUSTRE_IMP_FULL);
+ rc = mdc_reint(req, LUSTRE_IMP_FULL);
*request = req;
if (rc == -ERESTARTSYS)
rc = 0;