Details : Direct I/O operations should return actual amount of bytes
transferred rather than requested size.
+Severity : enhancement
+Bugzilla : 10589
+Description: metadata RPC reduction (e.g. for rm performance)
+Details : decrease the amount of synchronous RPC between clients and servers
+ by canceling conflicing lock before the operation on the client side
+ and packing thier handles into the main operation RPC to server.
+
--------------------------------------------------------------------------------
2007-05-03 Cluster File Systems, Inc. <info@clusterfs.com>
#define OBD_CONNECT_OSS_CAPA 0x00200000ULL /* OSS capability */
#define OBD_CONNECT_MDS_MDS 0x00400000ULL /* MDS-MDS connection*/
#define OBD_CONNECT_SOM 0x00800000ULL /* SOM feature */
+#define OBD_CONNECT_CANCELSET 0x01000000ULL /* Early batched cancels. */
#define OBD_CONNECT_REAL 0x00000200ULL /* real connection */
/* also update obd_connect_names[] for lprocfs_rd_connect_flags()
* and lustre/utils/wirecheck.c */
OBD_CONNECT_LCL_CLIENT | \
OBD_CONNECT_RMT_CLIENT | \
OBD_CONNECT_MDS_CAPA | OBD_CONNECT_OSS_CAPA | \
- OBD_CONNECT_MDS_MDS)
+ OBD_CONNECT_MDS_MDS | OBD_CONNECT_CANCELSET)
#define OST_CONNECT_SUPPORTED (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \
OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \
OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_INDEX | \
OBD_CONNECT_BRW_SIZE | OBD_CONNECT_QUOTA64 | \
- OBD_CONNECT_OSS_CAPA)
+ OBD_CONNECT_OSS_CAPA | OBD_CONNECT_CANCELSET)
#define ECHO_CONNECT_SUPPORTED (0)
#define MGS_CONNECT_SUPPORTED (OBD_CONNECT_VERSION)
#define OBD_OCD_VERSION_PATCH(version) ((int)((version)>>8)&255)
#define OBD_OCD_VERSION_FIX(version) ((int)(version)&255)
+#define exp_connect_cancelset(exp) \
+ ((exp) ? (exp)->exp_connect_flags & OBD_CONNECT_CANCELSET : 0)
+
/* This structure is used for both request and reply.
*
* If we eventually have separate connect data for different types, which we
extern void lustre_swab_mds_status_req (struct mds_status_req *r);
/* mdt_thread_info.mti_flags. */
-enum mdt_ioepoch_flags {
+enum md_op_flags {
/* The flag indicates Size-on-MDS attributes are changed. */
- MF_SOM_CHANGE = (1 << 0),
+ MF_SOM_CHANGE = (1 << 0),
/* Flags indicates an epoch opens or closes. */
- MF_EPOCH_OPEN = (1 << 1),
- MF_EPOCH_CLOSE = (1 << 2),
+ MF_EPOCH_OPEN = (1 << 1),
+ MF_EPOCH_CLOSE = (1 << 2),
+ MF_MDC_CANCEL_FID1 = (1 << 3),
+ MF_MDC_CANCEL_FID2 = (1 << 4),
+ MF_MDC_CANCEL_FID3 = (1 << 5),
+ MF_MDC_CANCEL_FID4 = (1 << 6),
};
+#define MF_SOM_LOCAL_FLAGS (MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID2 | \
+ MF_MDC_CANCEL_FID3 | MF_MDC_CANCEL_FID4)
+
#define MDS_BFLAG_UNCOMMITTED_WRITES 0x1
#define MDS_BFLAG_EXT_FLAGS 0x80000000 /* == EXT3_RESERVED_FL */
extern void lustre_swab_ldlm_lock_desc (struct ldlm_lock_desc *l);
+#define LDLM_LOCKREQ_HANDLES 2
+#define LDLM_ENQUEUE_CANCEL_OFF 1
+
struct ldlm_request {
__u32 lock_flags;
- __u32 lock_padding; /* also fix lustre_swab_ldlm_request */
+ __u32 lock_count;
struct ldlm_lock_desc lock_desc;
- struct lustre_handle lock_handle1;
- struct lustre_handle lock_handle2;
+ struct lustre_handle lock_handle[LDLM_LOCKREQ_HANDLES];
};
extern void lustre_swab_ldlm_request (struct ldlm_request *rq);
+/* If LDLM_ENQUEUE, 1 slot is already occupied, 1 is available.
+ * Otherwise, 2 are available. */
+#define ldlm_request_bufsize(count,type) \
+({ \
+ int _avail = LDLM_LOCKREQ_HANDLES; \
+ _avail -= (type == LDLM_ENQUEUE ? LDLM_ENQUEUE_CANCEL_OFF : 0); \
+ sizeof(struct ldlm_request) + \
+ (count > _avail ? count - _avail : 0) * \
+ sizeof(struct lustre_handle); \
+})
+
struct ldlm_reply {
__u32 lock_flags;
__u32 lock_padding; /* also fix lustre_swab_ldlm_reply */
extern void lustre_swab_ldlm_reply (struct ldlm_reply *r);
-
/*
* Opcodes for mountconf (mgs and mgc)
*/
#define OBD_LDLM_DEVICENAME "ldlm"
#define LDLM_DEFAULT_LRU_SIZE (100 * smp_num_cpus)
+#define LDLM_DEFAULT_MAX_ALIVE (cfs_time_seconds(36000))
typedef enum {
ELDLM_OK = 0,
#define LDLM_FL_LOCK_PROTECT 0x8000000
#define LDLM_FL_LOCK_PROTECT_BIT 27
+/* Cancel lock asynchronously. See ldlm_cli_cancel_unused_resource. */
+#define LDLM_FL_ASYNC 0x20000000
/* The blocking callback is overloaded to perform two functions. These flags
* indicate which operation should be performed. */
#define LCK_COMPAT_PR (LCK_COMPAT_PW | LCK_PR)
#define LCK_COMPAT_CW (LCK_COMPAT_PW | LCK_CW)
#define LCK_COMPAT_CR (LCK_COMPAT_CW | LCK_PR | LCK_PW)
-#define LCK_COMPAT_NL (LCK_COMPAT_CR | LCK_EX)
+#define LCK_COMPAT_NL (LCK_COMPAT_CR | LCK_EX | LCK_GROUP)
#define LCK_COMPAT_GROUP (LCK_GROUP | LCK_NL)
extern ldlm_mode_t lck_compat_array[];
spinlock_t ns_unused_lock;
unsigned int ns_max_unused;
+ unsigned int ns_max_age;
cfs_time_t ns_next_dump; /* next debug dump, jiffies */
atomic_t ns_locks;
/* ldlm_lock_change_resource() can change this */
struct ldlm_resource *l_resource;
- /* set once, no need to protect it */
- struct ldlm_lock *l_parent;
-
- /* protected by ns_hash_lock */
- struct list_head l_children;
- struct list_head l_childof;
-
/* protected by ns_hash_lock. FIXME */
struct list_head l_lru;
__u32 l_pid; /* pid which created this lock */
__u32 l_pidb; /* who holds LOCK_PROTECT_BIT */
- struct list_head l_tmp;
-
/* for ldlm_add_ast_work_item() */
struct list_head l_bl_ast;
struct list_head l_cp_ast;
struct semaphore lr_lvb_sem;
__u32 lr_lvb_len;
void *lr_lvb_data;
-
- /* lr_tmp holds a list head temporarily, during the building of a work
- * queue. see ldlm_add_ast_work_item and ldlm_run_ast_work */
- void *lr_tmp;
};
struct ldlm_ast_work {
ldlm_blocking_callback, ldlm_glimpse_callback);
int ldlm_handle_convert(struct ptlrpc_request *req);
int ldlm_handle_cancel(struct ptlrpc_request *req);
+int ldlm_request_cancel(struct ptlrpc_request *req,
+ const struct ldlm_request *dlm_req, int first);
int ldlm_del_waiting_lock(struct ldlm_lock *lock);
int ldlm_refresh_waiting_lock(struct ldlm_lock *lock);
void ldlm_revoke_export_locks(struct obd_export *exp);
lock; \
})
+#define ldlm_lock_list_put(head, member, count) \
+({ \
+ struct ldlm_lock *_lock, *_next; \
+ int c = count; \
+ list_for_each_entry_safe(_lock, _next, head, member) { \
+ list_del_init(&_lock->member); \
+ LDLM_LOCK_PUT(_lock); \
+ if (--c == 0) \
+ break; \
+ } \
+})
+
struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock);
void ldlm_lock_put(struct ldlm_lock *lock);
void ldlm_lock_destroy(struct ldlm_lock *lock);
ldlm_glimpse_callback glimpse,
void *data, void *lvb, __u32 lvb_len, void *lvb_swabber,
struct lustre_handle *lockh, int async);
+struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,
+ int bufcount, int *size,
+ struct list_head *head, int count);
int ldlm_handle_enqueue0(struct ldlm_namespace *ns, struct ptlrpc_request *req,
const struct ldlm_request *dlm_req,
const struct ldlm_callback_suite *cbs);
int ldlm_cli_cancel(struct lustre_handle *lockh);
int ldlm_cli_cancel_unused(struct ldlm_namespace *, const struct ldlm_res_id *,
int flags, void *opaque);
+int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
+ const struct ldlm_res_id *res_id,
+ ldlm_policy_data_t *policy,
+ int mode, int flags, void *opaque);
+int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *head,
+ int count, int flags);
int ldlm_cli_join_lru(struct ldlm_namespace *,
const struct ldlm_res_id *, int join);
-
+int ldlm_cancel_resource_local(struct ldlm_resource *res,
+ struct list_head *cancels,
+ ldlm_policy_data_t *policy,
+ ldlm_mode_t mode, int lock_flags,
+ int flags, void *opaque);
+int ldlm_cli_cancel_list(struct list_head *head, int count,
+ struct ptlrpc_request *req, int off, int flags);
+
/* mds/handler.c */
/* This has to be here because recursive inclusion sucks. */
int intent_disposition(struct ldlm_reply *rep, int flag);
struct md_op_data {
struct lu_fid op_fid1; /* operation fid1 (usualy parent) */
struct lu_fid op_fid2; /* operation fid2 (usualy child) */
+ struct lu_fid op_fid3; /* 2 extra fids to find conflicting */
+ struct lu_fid op_fid4; /* to the operation locks. */
mdsno_t op_mds; /* what mds server open will go to */
struct lustre_handle op_handle;
__u64 op_mod_time;
__u32 op_fsuid;
__u32 op_fsgid;
__u32 op_cap;
+ void *op_data;
/* iattr fields and blocks. */
struct iattr op_attr;
struct lustre_handle *);
int (*m_cancel_unused)(struct obd_export *, const struct lu_fid *,
- int flags, void *opaque);
+ ldlm_policy_data_t *, ldlm_mode_t, int flags,
+ void *opaque);
int (*m_renew_capa)(struct obd_export *, struct obd_capa *oc,
renew_capa_cb_t cb);
static inline int md_cancel_unused(struct obd_export *exp,
const struct lu_fid *fid,
- int flags, void *opaque)
+ ldlm_policy_data_t *policy,
+ ldlm_mode_t mode, int flags, void *opaque)
{
int rc;
ENTRY;
EXP_CHECK_MD_OP(exp, cancel_unused);
EXP_MD_COUNTER_INCREMENT(exp, cancel_unused);
- rc = MDP(exp->exp_obd, cancel_unused)(exp, fid, flags, opaque);
+ rc = MDP(exp->exp_obd, cancel_unused)(exp, fid, policy, mode,
+ flags, opaque);
RETURN(rc);
}
/* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
* - blocking ASTs have already been sent
- * - the caller has already initialized req->lr_tmp
* - must call this function with the ns lock held
*
* If first_enq is 1 (ie, called from ldlm_lock_enqueue):
* - blocking ASTs have not been sent
- * - the caller has NOT initialized req->lr_tmp, so we must
* - must call this function with the ns lock held once */
int ldlm_process_extent_lock(struct ldlm_lock *lock, int *flags, int first_enq,
ldlm_error_t *err, struct list_head *work_list)
/* XXX - if ldlm_lock_new() can sleep we should
* release the ns_lock, allocate the new lock,
* and restart processing this lock. */
- new2 = ldlm_lock_create(ns, NULL, &res->lr_name, LDLM_FLOCK,
+ new2 = ldlm_lock_create(ns, &res->lr_name, LDLM_FLOCK,
lock->l_granted_mode, NULL, NULL, NULL,
NULL, 0);
if (!new2) {
/* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
* - blocking ASTs have already been sent
- * - the caller has already initialized req->lr_tmp
* - must call this function with the ns lock held
*
* If first_enq is 1 (ie, called from ldlm_lock_enqueue):
* - blocking ASTs have not been sent
- * - the caller has NOT initialized req->lr_tmp, so we must
* - must call this function with the ns lock held once */
int ldlm_process_inodebits_lock(struct ldlm_lock *lock, int *flags,
int first_enq, ldlm_error_t *err,
LDLM_SYNC,
} ldlm_sync_t;
+/* Cancel lru flag, it indicates we cancel aged locks. */
+#define LDLM_CANCEL_AGED 0x00000001
+
int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync);
+int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
+ int count, int max, int flags);
/* ldlm_resource.c */
int ldlm_resource_putref_locked(struct ldlm_resource *res);
/* ldlm_lock.c */
void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list);
struct ldlm_lock *
-ldlm_lock_create(struct ldlm_namespace *ns,
- const struct lustre_handle *parent_lock_handle,
- const struct ldlm_res_id *,
+ldlm_lock_create(struct ldlm_namespace *ns, const struct ldlm_res_id *,
ldlm_type_t type, ldlm_mode_t, ldlm_blocking_callback,
ldlm_completion_callback, ldlm_glimpse_callback, void *data,
__u32 lvb_len);
/* ldlm_lockd.c */
int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
- struct ldlm_lock *lock);
+ struct ldlm_lock *lock, int flags);
void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
struct ldlm_lock_desc *ld, struct ldlm_lock *lock);
LASSERT(list_empty(&lock->l_res_link));
LASSERT(list_empty(&lock->l_pending_chain));
- if (lock->l_parent)
- LDLM_LOCK_PUT(lock->l_parent);
-
atomic_dec(&res->lr_namespace->ns_locks);
ldlm_resource_putref(res);
lock->l_resource = NULL;
{
ENTRY;
- if (!list_empty(&lock->l_children)) {
- LDLM_ERROR(lock, "still has children (%p)!",
- lock->l_children.next);
- ldlm_lock_dump(D_ERROR, lock, 0);
- LBUG();
- }
if (lock->l_readers || lock->l_writers) {
LDLM_ERROR(lock, "lock still has references");
ldlm_lock_dump(D_ERROR, lock, 0);
* after return, ldlm_*_put the resource and parent
* returns: lock with refcount 2 - one for current caller and one for remote
*/
-static struct ldlm_lock *ldlm_lock_new(struct ldlm_lock *parent,
- struct ldlm_resource *resource)
+static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
{
struct ldlm_lock *lock;
ENTRY;
lock->l_resource = ldlm_resource_getref(resource);
atomic_set(&lock->l_refc, 2);
- CFS_INIT_LIST_HEAD(&lock->l_children);
CFS_INIT_LIST_HEAD(&lock->l_res_link);
CFS_INIT_LIST_HEAD(&lock->l_lru);
CFS_INIT_LIST_HEAD(&lock->l_export_chain);
CFS_INIT_LIST_HEAD(&lock->l_pending_chain);
- CFS_INIT_LIST_HEAD(&lock->l_tmp);
CFS_INIT_LIST_HEAD(&lock->l_bl_ast);
CFS_INIT_LIST_HEAD(&lock->l_cp_ast);
cfs_waitq_init(&lock->l_waitq);
lock->l_sl_policy.next = NULL;
atomic_inc(&resource->lr_namespace->ns_locks);
-
- if (parent != NULL) {
- spin_lock(&resource->lr_namespace->ns_hash_lock);
- lock->l_parent = LDLM_LOCK_GET(parent);
- list_add(&lock->l_childof, &parent->l_children);
- spin_unlock(&resource->lr_namespace->ns_hash_lock);
- }
-
CFS_INIT_LIST_HEAD(&lock->l_handle.h_link);
class_handle_hash(&lock->l_handle, lock_handle_addref);
ldlm_lock_remove_from_lru(lock);
unlock_res_and_lock(lock);
if ((lock->l_flags & LDLM_FL_ATOMIC_CB) ||
- ldlm_bl_to_thread(ns, NULL, lock) != 0)
+ ldlm_bl_to_thread(ns, NULL, lock, 0) != 0)
ldlm_handle_bl_callback(ns, NULL, lock);
} else if (ns->ns_client == LDLM_NAMESPACE_CLIENT &&
!lock->l_readers && !lock->l_writers &&
* reference, put it on the LRU. */
LASSERT(list_empty(&lock->l_lru));
LASSERT(ns->ns_nr_unused >= 0);
+ lock->l_last_used = cfs_time_current();
spin_lock(&ns->ns_unused_lock);
list_add_tail(&lock->l_lru, &ns->ns_unused_list);
ns->ns_nr_unused++;
spin_unlock(&ns->ns_unused_lock);
unlock_res_and_lock(lock);
- ldlm_cancel_lru(ns, LDLM_ASYNC);
+ /* Call ldlm_cancel_lru() only if EARLY_CANCEL is not supported
+ * by the server, otherwise, it is done on enqueue. */
+ if (!exp_connect_cancelset(lock->l_conn_export))
+ ldlm_cancel_lru(ns, LDLM_ASYNC);
} else {
unlock_res_and_lock(lock);
}
/* Returns a referenced lock */
struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
- const struct lustre_handle *parent_lock_handle,
const struct ldlm_res_id *res_id,
ldlm_type_t type,
ldlm_mode_t mode,
ldlm_glimpse_callback glimpse,
void *data, __u32 lvb_len)
{
- struct ldlm_resource *res, *parent_res = NULL;
- struct ldlm_lock *lock, *parent_lock = NULL;
+ struct ldlm_lock *lock;
+ struct ldlm_resource *res;
ENTRY;
- if (parent_lock_handle) {
- parent_lock = ldlm_handle2lock(parent_lock_handle);
- if (parent_lock)
- parent_res = parent_lock->l_resource;
- }
-
- res = ldlm_resource_get(ns, parent_res, res_id, type, 1);
+ res = ldlm_resource_get(ns, NULL, res_id, type, 1);
if (res == NULL)
RETURN(NULL);
- lock = ldlm_lock_new(parent_lock, res);
+ lock = ldlm_lock_new(res);
ldlm_resource_putref(res);
- if (parent_lock != NULL)
- LDLM_LOCK_PUT(parent_lock);
if (lock == NULL)
RETURN(NULL);
struct ldlm_namespace *blwi_ns;
struct ldlm_lock_desc blwi_ld;
struct ldlm_lock *blwi_lock;
+ int blwi_flags;
};
#ifdef __KERNEL__
instant_cancel = 1;
body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
- body->lock_handle1 = lock->l_remote_handle;
+ body->lock_handle[0] = lock->l_remote_handle;
body->lock_desc = *desc;
body->lock_flags |= (lock->l_flags & LDLM_AST_FLAGS);
RETURN(-ENOMEM);
body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
- body->lock_handle1 = lock->l_remote_handle;
+ body->lock_handle[0] = lock->l_remote_handle;
body->lock_flags = flags;
ldlm_lock2desc(lock, &body->lock_desc);
RETURN(-ENOMEM);
body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
- body->lock_handle1 = lock->l_remote_handle;
+ body->lock_handle[0] = lock->l_remote_handle;
ldlm_lock2desc(lock, &body->lock_desc);
lock_res_and_lock(lock);
LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
+ ldlm_request_cancel(req, dlm_req, LDLM_ENQUEUE_CANCEL_OFF);
flags = dlm_req->lock_flags;
LASSERT(req->rq_export);
if (unlikely(flags & LDLM_FL_REPLAY)) {
lock = find_existing_lock(req->rq_export,
- &dlm_req->lock_handle1);
+ &dlm_req->lock_handle[0]);
if (lock != NULL) {
DEBUG_REQ(D_HA, req, "found existing lock cookie "LPX64,
lock->l_handle.h_cookie);
}
/* The lock's callback data might be set in the policy function */
- lock = ldlm_lock_create(ns, &dlm_req->lock_handle2,
- &dlm_req->lock_desc.l_resource.lr_name,
+ lock = ldlm_lock_create(ns, &dlm_req->lock_desc.l_resource.lr_name,
dlm_req->lock_desc.l_resource.lr_type,
dlm_req->lock_desc.l_req_mode,
cbs->lcs_blocking, cbs->lcs_completion,
GOTO(out, rc = -ENOMEM);
do_gettimeofday(&lock->l_enqueued_time);
- lock->l_remote_handle = dlm_req->lock_handle1;
+ lock->l_remote_handle = dlm_req->lock_handle[0];
LDLM_DEBUG(lock, "server-side enqueue handler, new lock created");
OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2);
sizeof(*dlm_rep));
dlm_rep->lock_flags = dlm_req->lock_flags;
- lock = ldlm_handle2lock(&dlm_req->lock_handle1);
+ lock = ldlm_handle2lock(&dlm_req->lock_handle[0]);
if (!lock) {
req->rq_status = EINVAL;
} else {
return rc;
}
+/* Cancel all the locks, which handles are packed into ldlm_request */
+int ldlm_request_cancel(struct ptlrpc_request *req,
+ const struct ldlm_request *dlm_req, int first)
+{
+ struct ldlm_resource *res, *pres = NULL;
+ struct ldlm_lock *lock;
+ int i, count, done = 0;
+ ENTRY;
+
+ count = dlm_req->lock_count ? dlm_req->lock_count : 1;
+ if (first >= count)
+ RETURN(0);
+
+ /* There is no lock on the server at the replay time,
+ * skip lock cancelling to make replay tests to pass. */
+ if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
+ RETURN(0);
+
+ LDLM_DEBUG_NOLOCK("server-side cancel handler START: %d locks",
+ count - first);
+ for (i = first; i < count; i++) {
+ lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);
+ if (!lock) {
+ LDLM_DEBUG_NOLOCK("server-side cancel handler stale "
+ "lock (cookie "LPU64")",
+ dlm_req->lock_handle[i].cookie);
+ continue;
+ }
+
+ res = lock->l_resource;
+ done++;
+ ldlm_lock_cancel(lock);
+ if (ldlm_del_waiting_lock(lock))
+ CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock);
+
+ if (res != pres) {
+ if (pres != NULL) {
+ if (pres->lr_namespace->ns_lvbo &&
+ pres->lr_namespace->ns_lvbo->lvbo_update) {
+ (void)pres->lr_namespace->ns_lvbo->
+ lvbo_update(pres, NULL, 0, 1);
+ }
+ ldlm_reprocess_all(pres);
+ ldlm_resource_putref(pres);
+ }
+ if (res != NULL)
+ ldlm_resource_getref(res);
+ pres = res;
+ }
+ LDLM_LOCK_PUT(lock);
+ }
+ if (pres != NULL) {
+ if (pres->lr_namespace->ns_lvbo &&
+ pres->lr_namespace->ns_lvbo->lvbo_update) {
+ (void)pres->lr_namespace->ns_lvbo->
+ lvbo_update(pres, NULL, 0, 1);
+ }
+ ldlm_reprocess_all(pres);
+ ldlm_resource_putref(pres);
+ }
+ LDLM_DEBUG_NOLOCK("server-side cancel handler END");
+ RETURN(done);
+}
+
int ldlm_handle_cancel(struct ptlrpc_request *req)
{
struct ldlm_request *dlm_req;
- struct ldlm_lock *lock;
- struct ldlm_resource *res;
int rc;
ENTRY;
CERROR("out of memory\n");
RETURN(-ENOMEM);
}
-
- lock = ldlm_handle2lock(&dlm_req->lock_handle1);
- if (!lock) {
- CERROR("received cancel for unknown lock cookie "LPX64
- " from client %s id %s\n",
- dlm_req->lock_handle1.cookie,
- req->rq_export->exp_client_uuid.uuid,
- libcfs_id2str(req->rq_peer));
- LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock "
- "(cookie "LPU64")",
- dlm_req->lock_handle1.cookie);
+
+ if (!ldlm_request_cancel(req, dlm_req, 0))
req->rq_status = ESTALE;
- } else {
- LDLM_DEBUG(lock, "server-side cancel handler START");
- res = lock->l_resource;
- if (res && res->lr_namespace->ns_lvbo &&
- res->lr_namespace->ns_lvbo->lvbo_update) {
- (void)res->lr_namespace->ns_lvbo->lvbo_update
- (res, NULL, 0, 0);
- }
-
- ldlm_lock_cancel(lock);
- if (ldlm_del_waiting_lock(lock))
- CDEBUG(D_DLMTRACE, "cancelled waiting lock %p\n", lock);
- req->rq_status = rc;
- }
if (ptlrpc_reply(req) != 0)
LBUG();
- if (lock) {
- ldlm_reprocess_all(lock->l_resource);
- LDLM_DEBUG(lock, "server-side cancel handler END");
- LDLM_LOCK_PUT(lock);
- }
-
RETURN(0);
}
if (lock->l_granted_mode == LCK_PW &&
!lock->l_readers && !lock->l_writers &&
cfs_time_after(cfs_time_current(),
- cfs_time_add(lock->l_last_used, cfs_time_seconds(10)))) {
+ cfs_time_add(lock->l_last_used,
+ cfs_time_seconds(10)))) {
unlock_res_and_lock(lock);
- if (ldlm_bl_to_thread(ns, NULL, lock))
+ if (ldlm_bl_to_thread(ns, NULL, lock, 0))
ldlm_handle_bl_callback(ns, NULL, lock);
EXIT;
}
int ldlm_bl_to_thread(struct ldlm_namespace *ns, struct ldlm_lock_desc *ld,
- struct ldlm_lock *lock)
+ struct ldlm_lock *lock, int flags)
{
#ifdef __KERNEL__
struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
if (ld != NULL)
blwi->blwi_ld = *ld;
blwi->blwi_lock = lock;
+ blwi->blwi_flags = flags;
spin_lock(&blp->blp_lock);
list_add_tail(&blwi->blwi_entry, &blp->blp_list);
lustre_swab_ldlm_request);
if (dlm_req != NULL)
CDEBUG(D_RPCTRACE, "--> lock cookie: "LPX64"\n",
- dlm_req->lock_handle1.cookie);
+ dlm_req->lock_handle[0].cookie);
ldlm_callback_reply(req, -ENOTCONN);
RETURN(0);
RETURN (0);
}
- lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle1);
+ lock = ldlm_handle2lock_ns(ns, &dlm_req->lock_handle[0]);
if (!lock) {
CDEBUG(D_INODE, "callback on lock "LPX64" - lock disappeared\n",
- dlm_req->lock_handle1.cookie);
+ dlm_req->lock_handle[0].cookie);
ldlm_callback_reply(req, -EINVAL);
RETURN(0);
}
CDEBUG(D_INODE, "blocking ast\n");
if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK))
ldlm_callback_reply(req, 0);
- if (ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock))
+ if (ldlm_bl_to_thread(ns, &dlm_req->lock_desc, lock, 0))
ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
break;
case LDLM_CP_CALLBACK:
sizeof(*dlm_req),
lustre_swab_ldlm_request);
if (dlm_req != NULL)
- ldlm_lock_dump_handle(D_ERROR, &dlm_req->lock_handle1);
-
+ ldlm_lock_dump_handle(D_ERROR,
+ &dlm_req->lock_handle[0]);
ldlm_callback_reply(req, -ENOTCONN);
RETURN(0);
}
if (blwi->blwi_ns == NULL)
break;
- ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
- blwi->blwi_lock);
+ if (blwi->blwi_flags == LDLM_FL_CANCELING) {
+ /* The special case when we cancel locks in lru
+ * asynchronously, then we first remove the lock from
+ * l_bl_ast explicitely in ldlm_cancel_lru before
+ * sending it to this thread. Thus lock is marked
+ * LDLM_FL_CANCELING, and already cancelled locally. */
+ CFS_LIST_HEAD(head);
+ LASSERT(list_empty(&blwi->blwi_lock->l_bl_ast));
+ list_add(&blwi->blwi_lock->l_bl_ast, &head);
+ ldlm_cli_cancel_req(blwi->blwi_lock->l_conn_export,
+ &head, 1, 0);
+ LDLM_LOCK_PUT(blwi->blwi_lock);
+ } else {
+ ldlm_handle_bl_callback(blwi->blwi_ns, &blwi->blwi_ld,
+ blwi->blwi_lock);
+ }
OBD_FREE(blwi, sizeof(*blwi));
}
EXPORT_SYMBOL(ldlm_blocking_ast);
EXPORT_SYMBOL(ldlm_glimpse_ast);
EXPORT_SYMBOL(ldlm_expired_completion_wait);
+EXPORT_SYMBOL(ldlm_prep_enqueue_req);
EXPORT_SYMBOL(ldlm_cli_convert);
EXPORT_SYMBOL(ldlm_cli_enqueue);
EXPORT_SYMBOL(ldlm_cli_enqueue_fini);
EXPORT_SYMBOL(ldlm_cli_enqueue_local);
EXPORT_SYMBOL(ldlm_cli_cancel);
EXPORT_SYMBOL(ldlm_cli_cancel_unused);
+EXPORT_SYMBOL(ldlm_cli_cancel_unused_resource);
+EXPORT_SYMBOL(ldlm_cli_cancel_req);
EXPORT_SYMBOL(ldlm_cli_join_lru);
EXPORT_SYMBOL(ldlm_replay_locks);
EXPORT_SYMBOL(ldlm_resource_foreach);
EXPORT_SYMBOL(ldlm_namespace_foreach);
EXPORT_SYMBOL(ldlm_namespace_foreach_res);
EXPORT_SYMBOL(ldlm_resource_iterate);
+EXPORT_SYMBOL(ldlm_cancel_resource_local);
+EXPORT_SYMBOL(ldlm_cli_cancel_list);
/* ldlm_lockd.c */
EXPORT_SYMBOL(ldlm_server_blocking_ast);
EXPORT_SYMBOL(ldlm_handle_enqueue);
EXPORT_SYMBOL(ldlm_handle_enqueue0);
EXPORT_SYMBOL(ldlm_handle_cancel);
+EXPORT_SYMBOL(ldlm_request_cancel);
EXPORT_SYMBOL(ldlm_handle_convert);
EXPORT_SYMBOL(ldlm_handle_convert0);
EXPORT_SYMBOL(ldlm_del_waiting_lock);
/* If first_enq is 0 (ie, called from ldlm_reprocess_queue):
* - blocking ASTs have already been sent
- * - the caller has already initialized req->lr_tmp
* - must call this function with the resource lock held
*
* If first_enq is 1 (ie, called from ldlm_lock_enqueue):
* - blocking ASTs have not been sent
- * - the caller has NOT initialized req->lr_tmp, so we must
* - must call this function with the resource lock held */
int ldlm_process_plain_lock(struct ldlm_lock *lock, int *flags, int first_enq,
ldlm_error_t *err, struct list_head *work_list)
LBUG();
}
- lock = ldlm_lock_create(ns, NULL, res_id, type, mode, blocking,
+ lock = ldlm_lock_create(ns, res_id, type, mode, blocking,
completion, glimpse, data, lvb_len);
if (unlikely(!lock))
GOTO(out_nolock, err = -ENOMEM);
return rc;
}
+/* PAGE_SIZE-512 is to allow TCP/IP and LNET headers to fit into
+ * a single page on the send/receive side. XXX: 512 should be changed
+ * to more adequate value. */
+#define ldlm_req_handles_avail(exp, size, bufcount, off) \
+({ \
+ int _avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512); \
+ int _s = size[DLM_LOCKREQ_OFF]; \
+ size[DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request); \
+ _avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, \
+ bufcount, size); \
+ _avail /= sizeof(struct lustre_handle); \
+ _avail += LDLM_LOCKREQ_HANDLES - off; \
+ size[DLM_LOCKREQ_OFF] = _s; \
+ _avail; \
+})
+
+/* Cancel lru locks and pack them into the enqueue request. Pack there the given
+ * @count locks in @cancel. */
+struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,
+ int bufcount, int *size,
+ struct list_head *cancels,
+ int count)
+{
+ struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
+ struct ldlm_request *dlm = NULL;
+ struct ptlrpc_request *req;
+ CFS_LIST_HEAD(head);
+ ENTRY;
+
+ if (cancels == NULL)
+ cancels = &head;
+ if (exp_connect_cancelset(exp)) {
+ /* Estimate the amount of available space in the request. */
+ int avail = ldlm_req_handles_avail(exp, size, bufcount,
+ LDLM_ENQUEUE_CANCEL_OFF);
+ LASSERT(avail >= count);
+
+ /* Cancel lru locks here _only_ if the server supports
+ * EARLY_CANCEL. Otherwise we have to send extra CANCEL
+ * rpc right on enqueue, what will make it slower, vs.
+ * asynchronous rpc in blocking thread. */
+ count += ldlm_cancel_lru_local(ns, cancels, 1, avail - count,
+ LDLM_CANCEL_AGED);
+ size[DLM_LOCKREQ_OFF] =
+ ldlm_request_bufsize(count, LDLM_ENQUEUE);
+ }
+ req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
+ LDLM_ENQUEUE, bufcount, size, NULL);
+ if (exp_connect_cancelset(exp) && req) {
+ dlm = lustre_msg_buf(req->rq_reqmsg,
+ DLM_LOCKREQ_OFF, sizeof(*dlm));
+ /* Skip first lock handler in ldlm_request_pack(), this method
+ * will incrment @lock_count according to the lock handle amount
+ * actually written to the buffer. */
+ dlm->lock_count = LDLM_ENQUEUE_CANCEL_OFF;
+ }
+ if (req)
+ ldlm_cli_cancel_list(cancels, count, req, DLM_LOCKREQ_OFF, 0);
+ else
+ ldlm_lock_list_put(cancels, l_bl_ast, count);
+ RETURN(req);
+}
+
/* If a request has some specific initialisation it is passed in @reqp,
* otherwise it is created in ldlm_cli_enqueue.
*
LDLM_DEBUG(lock, "client-side enqueue START");
LASSERT(exp == lock->l_conn_export);
} else {
- lock = ldlm_lock_create(ns, NULL, res_id, type, mode, blocking,
+ lock = ldlm_lock_create(ns, res_id, type, mode, blocking,
completion, glimpse, data, lvb_len);
if (lock == NULL)
RETURN(-ENOMEM);
/* lock not sent to server yet */
if (reqp == NULL || *reqp == NULL) {
- req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
- LDLM_ENQUEUE, 2, size, NULL);
+ req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
if (req == NULL) {
failed_lock_cleanup(ns, lock, lockh, mode);
LDLM_LOCK_PUT(lock);
*reqp = req;
} else {
req = *reqp;
- LASSERTF(lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF) ==
+ LASSERTF(lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF) >=
sizeof(*body), "buflen[%d] = %d, not "LPSZ"\n",
DLM_LOCKREQ_OFF,
lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF),
body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
ldlm_lock2desc(lock, &body->lock_desc);
body->lock_flags = *flags;
- body->lock_handle1 = *lockh;
+ body->lock_handle[0] = *lockh;
/* Continue as normal. */
if (!req_passed_in) {
GOTO(out, rc = -ENOMEM);
body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));
- body->lock_handle1 = lock->l_remote_handle;
+ body->lock_handle[0] = lock->l_remote_handle;
body->lock_desc.l_req_mode = new_mode;
body->lock_flags = *flags;
return rc;
}
-int ldlm_cli_cancel(struct lustre_handle *lockh)
+/* Cancel locks locally.
+ * Returns: 1 if there is a need to send a cancel RPC to server. 0 otherwise. */
+static int ldlm_cli_cancel_local(struct ldlm_lock *lock)
{
- struct ptlrpc_request *req;
- struct ldlm_lock *lock;
- struct ldlm_request *body;
- int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
- [DLM_LOCKREQ_OFF] = sizeof(*body) };
int rc = 0;
ENTRY;
-
- /* concurrent cancels on the same handle can happen */
- lock = __ldlm_handle2lock(lockh, LDLM_FL_CANCELING);
- if (lock == NULL)
- RETURN(0);
if (lock->l_conn_export) {
int local_only;
- struct obd_import *imp;
LDLM_DEBUG(lock, "client-side cancel");
/* Set this flag to prevent others from getting new references*/
(LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK));
ldlm_cancel_callback(lock);
unlock_res_and_lock(lock);
-
- if (local_only) {
+
+ if (local_only)
CDEBUG(D_INFO, "not sending request (at caller's "
"instruction)\n");
- goto local_cancel;
+ else
+ rc = 1;
+
+ ldlm_lock_cancel(lock);
+ } else {
+ if (lock->l_resource->lr_namespace->ns_client) {
+ LDLM_ERROR(lock, "Trying to cancel local lock");
+ LBUG();
}
+ LDLM_DEBUG(lock, "server-side local cancel");
+ ldlm_lock_cancel(lock);
+ ldlm_reprocess_all(lock->l_resource);
+ LDLM_DEBUG(lock, "server-side local cancel handler END");
+ }
+
+ RETURN(rc);
+}
+
+/* Pack @count locks in @head into ldlm_request buffer at the offset @off,
+ of the request @req. */
+static void ldlm_cancel_pack(struct ptlrpc_request *req, int off,
+ struct list_head *head, int count)
+{
+ struct ldlm_request *dlm;
+ struct ldlm_lock *lock;
+ int max;
+ ENTRY;
+
+ dlm = lustre_msg_buf(req->rq_reqmsg, off, sizeof(*dlm));
+ LASSERT(dlm != NULL);
+
+ /* Check the room in the request buffer. */
+ max = lustre_msg_buflen(req->rq_reqmsg, off) -
+ sizeof(struct ldlm_request);
+ max /= sizeof(struct lustre_handle);
+ max += LDLM_LOCKREQ_HANDLES;
+ LASSERT(max >= dlm->lock_count + count);
+
+ /* XXX: it would be better to pack lock handles grouped by resource.
+ * so that the server cancel would call filter_lvbo_update() less
+ * frequently. */
+ list_for_each_entry(lock, head, l_bl_ast) {
+ if (!count--)
+ break;
+ /* Pack the lock handle to the given request buffer. */
+ LASSERT(lock->l_conn_export);
+ /* Cannot be set on a lock in a resource granted list.*/
+ LASSERT(!(lock->l_flags &
+ (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK)));
+ /* If @lock is marked CANCEL_ON_BLOCK, cancel
+ * will not be sent in ldlm_cli_cancel(). It
+ * is used for liblustre clients, no cancel on
+ * block requests. However, even for liblustre
+ * clients, when the flag is set, batched cancel
+ * should be sent (what if no block rpc has
+ * come). To not send another separated rpc in
+ * this case, the caller pass CANCEL_ON_BLOCK
+ * flag to ldlm_cli_cancel_unused_resource(). */
+ dlm->lock_handle[dlm->lock_count++] = lock->l_remote_handle;
+ }
+ EXIT;
+}
+
+/* Prepare and send a batched cancel rpc, it will include count lock handles
+ * of locks given in @head. */
+int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
+ int count, int flags)
+{
+ struct ptlrpc_request *req = NULL;
+ struct ldlm_request *body;
+ int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
+ [DLM_LOCKREQ_OFF] = sizeof(*body) };
+ struct obd_import *imp;
+ int free, sent = 0;
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(exp != NULL);
+ LASSERT(count > 0);
- restart:
- imp = class_exp2cliimp(lock->l_conn_export);
+ free = ldlm_req_handles_avail(exp, size, 2, 0);
+ if (count > free)
+ count = free;
+
+ size[DLM_LOCKREQ_OFF] = ldlm_request_bufsize(count, LDLM_CANCEL);
+ while (1) {
+ imp = class_exp2cliimp(exp);
if (imp == NULL || imp->imp_invalid) {
CDEBUG(D_HA, "skipping cancel on invalid import %p\n",
imp);
- goto local_cancel;
+ break;
}
req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_CANCEL, 2,
size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
+
req->rq_no_resend = 1;
/* XXX FIXME bug 249 */
body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,
sizeof(*body));
- body->lock_handle1 = lock->l_remote_handle;
+ ldlm_cancel_pack(req, DLM_LOCKREQ_OFF, cancels, count);
ptlrpc_req_set_repsize(req, 1, NULL);
- rc = ptlrpc_queue_wait(req);
-
+ if (flags & LDLM_FL_ASYNC) {
+ ptlrpcd_add_req(req);
+ sent = count;
+ GOTO(out, 0);
+ } else {
+ rc = ptlrpc_queue_wait(req);
+ }
if (rc == ESTALE) {
- CDEBUG(D_DLMTRACE, "client/server (nid %s) out of sync "
- "-- not fatal, flags %x\n",
+ CDEBUG(D_DLMTRACE, "client/server (nid %s) "
+ "out of sync -- not fatal\n",
libcfs_nid2str(req->rq_import->
- imp_connection->c_peer.nid),
- lock->l_flags);
+ imp_connection->c_peer.nid));
} else if (rc == -ETIMEDOUT) {
ptlrpc_req_finished(req);
- GOTO(restart, rc);
+ continue;
} else if (rc != ELDLM_OK) {
CERROR("Got rc %d from cancel RPC: canceling "
"anyway\n", rc);
+ break;
}
-
- ptlrpc_req_finished(req);
- local_cancel:
- ldlm_lock_cancel(lock);
- } else {
- if (lock->l_resource->lr_namespace->ns_client) {
- LDLM_ERROR(lock, "Trying to cancel local lock");
- LBUG();
- }
- LDLM_DEBUG(lock, "client-side local cancel");
- ldlm_lock_cancel(lock);
- ldlm_reprocess_all(lock->l_resource);
- LDLM_DEBUG(lock, "client-side local cancel handler END");
+ sent = count;
+ break;
}
+ ptlrpc_req_finished(req);
EXIT;
- out:
- LDLM_LOCK_PUT(lock);
- return rc;
+out:
+ return sent ? sent : rc;
}
-/* when called with LDLM_ASYNC the blocking callback will be handled
- * in a thread and this function will return after the thread has been
- * asked to call the callback. when called with LDLM_SYNC the blocking
- * callback will be performed in this function. */
-int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync)
+int ldlm_cli_cancel(struct lustre_handle *lockh)
{
- struct ldlm_lock *lock, *next;
- int count, rc = 0;
- CFS_LIST_HEAD(cblist);
+ struct ldlm_lock *lock;
+ CFS_LIST_HEAD(head);
+ int rc = 0;
ENTRY;
-#ifndef __KERNEL__
- sync = LDLM_SYNC; /* force to be sync in user space */
-#endif
+ /* concurrent cancels on the same handle can happen */
+ lock = __ldlm_handle2lock(lockh, LDLM_FL_CANCELING);
+ if (lock == NULL)
+ RETURN(0);
- spin_lock(&ns->ns_unused_lock);
- count = ns->ns_nr_unused - ns->ns_max_unused;
+ rc = ldlm_cli_cancel_local(lock);
+ if (rc <= 0)
+ GOTO(out, rc);
- if (count <= 0) {
- spin_unlock(&ns->ns_unused_lock);
- RETURN(0);
- }
+ list_add(&lock->l_bl_ast, &head);
+ rc = ldlm_cli_cancel_req(lock->l_conn_export, &head, 1, 0);
+ EXIT;
+out:
+ LDLM_LOCK_PUT(lock);
+ return rc < 0 ? rc : 0;
+}
+/* - Free space in lru for @count new locks,
+ * redundant unused locks are canceled locally;
+ * - also cancel locally unused aged locks;
+ * - do not cancel more than @max locks;
+ * - GET the found locks and add them into the @cancels list.
+ *
+ * A client lock can be added to the l_bl_ast list only when it is
+ * marked LDLM_FL_CANCELING. Otherwise, somebody is already doing CANCEL.
+ * There are the following use cases: ldlm_cancel_resource_local(),
+ * ldlm_cancel_lru_local() and ldlm_cli_cancel(), which check&set this
+ * flag properly. As any attempt to cancel a lock rely on this flag,
+ * l_bl_ast list is accessed later without any special locking. */
+int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
+ int count, int max, int flags)
+{
+ cfs_time_t cur = cfs_time_current();
+ struct ldlm_lock *lock, *next;
+ int rc, added = 0, left;
+ ENTRY;
+
+ spin_lock(&ns->ns_unused_lock);
+ count += ns->ns_nr_unused - ns->ns_max_unused;
while (!list_empty(&ns->ns_unused_list)) {
struct list_head *tmp = ns->ns_unused_list.next;
lock = list_entry(tmp, struct ldlm_lock, l_lru);
- LASSERT(!lock->l_readers && !lock->l_writers);
+
+ if (max && added >= max)
+ break;
+
+ if ((added >= count) &&
+ (!(flags & LDLM_CANCEL_AGED) ||
+ cfs_time_before_64(cur, ns->ns_max_age +
+ lock->l_last_used)))
+ break;
LDLM_LOCK_GET(lock); /* dropped by bl thread */
spin_unlock(&ns->ns_unused_lock);
lock_res_and_lock(lock);
- if (ldlm_lock_remove_from_lru(lock) == 0) {
- /* other thread is removing lock from lru */
+ if ((ldlm_lock_remove_from_lru(lock) == 0) ||
+ (lock->l_flags & LDLM_FL_CANCELING)) {
+ /* other thread is removing lock from lru or
+ * somebody is already doing CANCEL. */
unlock_res_and_lock(lock);
LDLM_LOCK_PUT(lock);
spin_lock(&ns->ns_unused_lock);
continue;
-
}
+ LASSERT(!lock->l_readers && !lock->l_writers);
/* If we have chosen to canecl this lock voluntarily, we better
send cancel notification to server, so that it frees
* the lock can accumulate no more readers/writers. Since
* readers and writers are already zero here, ldlm_lock_decref
* won't see this flag and call l_blocking_ast */
- lock->l_flags |= LDLM_FL_CBPENDING;
-
+ lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING;
/* We can't re-add to l_lru as it confuses the refcounting in
* ldlm_lock_remove_from_lru() if an AST arrives after we drop
- * ns_lock below. We use l_tmp and can't use l_pending_chain as
- * it is used both on server and client nevertheles bug 5666
+ * ns_lock below. We use l_bl_ast and can't use l_pending_chain
+ * as it is used both on server and client nevertheles bug 5666
* says it is used only on server. --umka */
- list_add(&lock->l_tmp, &cblist);
- unlock_res_and_lock(lock);
-
- LDLM_LOCK_GET(lock); /* to hold lock after bl thread */
- if (sync == LDLM_ASYNC && (ldlm_bl_to_thread(ns, NULL, lock) == 0)) {
- lock_res_and_lock(lock);
- list_del_init(&lock->l_tmp);
- unlock_res_and_lock(lock);
- }
- LDLM_LOCK_PUT(lock);
+ LASSERT(list_empty(&lock->l_bl_ast));
+ list_add(&lock->l_bl_ast, cancels);
+ unlock_res_and_lock(lock);
spin_lock(&ns->ns_unused_lock);
-
- if (--count == 0)
- break;
+ added++;
}
spin_unlock(&ns->ns_unused_lock);
- list_for_each_entry_safe(lock, next, &cblist, l_tmp) {
- list_del_init(&lock->l_tmp);
- ldlm_handle_bl_callback(ns, NULL, lock);
- }
-
- RETURN(rc);
+ /* Handle only @added inserted locks. */
+ left = added;
+ list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
+ if (left-- == 0)
+ break;
+ rc = ldlm_cli_cancel_local(lock);
+ if (rc == 0) {
+ /* CANCEL RPC should not be sent to server. */
+ list_del_init(&lock->l_bl_ast);
+ LDLM_LOCK_PUT(lock);
+ added--;
+ }
+ }
+ RETURN(added);
}
-static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
- const struct ldlm_res_id *res_id,
- int flags, void *opaque)
+/* when called with LDLM_ASYNC the blocking callback will be handled
+ * in a thread and this function will return after the thread has been
+ * asked to call the callback. when called with LDLM_SYNC the blocking
+ * callback will be performed in this function. */
+int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync)
{
- struct list_head *tmp, *next, list = CFS_LIST_HEAD_INIT(list);
- struct ldlm_resource *res;
- struct ldlm_lock *lock;
+ CFS_LIST_HEAD(cancels);
+ int count, rc;
ENTRY;
- res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
- if (res == NULL) {
- /* This is not a problem. */
- CDEBUG(D_INFO, "No resource "LPU64"\n", res_id->name[0]);
- RETURN(0);
+#ifndef __KERNEL__
+ sync = LDLM_SYNC; /* force to be sync in user space */
+#endif
+ count = ldlm_cancel_lru_local(ns, &cancels, 0, 0, 0);
+ if (sync == LDLM_ASYNC) {
+ struct ldlm_lock *lock, *next;
+ list_for_each_entry_safe(lock, next, &cancels, l_bl_ast) {
+ /* Remove from the list to allow blocking thread to
+ * re-use l_bl_ast. */
+ list_del_init(&lock->l_bl_ast);
+ rc = ldlm_bl_to_thread(ns, NULL, lock,
+ LDLM_FL_CANCELING);
+ if (rc)
+ list_add_tail(&lock->l_bl_ast, &next->l_bl_ast);
+ }
}
- lock_res(res);
- list_for_each(tmp, &res->lr_granted) {
- lock = list_entry(tmp, struct ldlm_lock, l_res_link);
+ /* If some locks are left in the list in ASYNC mode, or
+ * this is SYNC mode, cancel the list. */
+ ldlm_cli_cancel_list(&cancels, count, NULL, DLM_LOCKREQ_OFF, 0);
+ RETURN(0);
+}
+/* Find and cancel locally unused locks found on resource, matched to the
+ * given policy, mode. GET the found locks and add them into the @cancels
+ * list. */
+int ldlm_cancel_resource_local(struct ldlm_resource *res,
+ struct list_head *cancels,
+ ldlm_policy_data_t *policy,
+ ldlm_mode_t mode, int lock_flags,
+ int flags, void *opaque)
+{
+ struct ldlm_lock *lock, *next;
+ int count = 0, left;
+ ENTRY;
+
+ lock_res(res);
+ list_for_each_entry(lock, &res->lr_granted, l_res_link) {
if (opaque != NULL && lock->l_ast_data != opaque) {
LDLM_ERROR(lock, "data %p doesn't match opaque %p",
lock->l_ast_data, opaque);
continue;
}
+ if (lockmode_compat(lock->l_granted_mode, mode))
+ continue;
+
+ /* If policy is given and this is IBITS lock, add to list only
+ * those locks that match by policy. */
+ if (policy && (lock->l_resource->lr_type == LDLM_IBITS) &&
+ !(lock->l_policy_data.l_inodebits.bits &
+ policy->l_inodebits.bits))
+ continue;
+
+ /* If somebody is already doing CANCEL, skip it. */
+ if (lock->l_flags & LDLM_FL_CANCELING)
+ continue;
+
/* See CBPENDING comment in ldlm_cancel_lru */
- lock->l_flags |= LDLM_FL_CBPENDING;
+ lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING |
+ lock_flags;
LASSERT(list_empty(&lock->l_bl_ast));
- list_add(&lock->l_bl_ast, &list);
+ list_add(&lock->l_bl_ast, cancels);
LDLM_LOCK_GET(lock);
+ count++;
}
unlock_res(res);
- list_for_each_safe(tmp, next, &list) {
- struct lustre_handle lockh;
- int rc;
- lock = list_entry(tmp, struct ldlm_lock, l_bl_ast);
+ /* Handle only @count inserted locks. */
+ left = count;
+ list_for_each_entry_safe(lock, next, cancels, l_bl_ast) {
+ int rc = 0;
- if (flags & LDLM_FL_LOCAL_ONLY) {
+ if (left-- == 0)
+ break;
+ if (flags & LDLM_FL_LOCAL_ONLY)
ldlm_lock_cancel(lock);
+ else
+ rc = ldlm_cli_cancel_local(lock);
+
+ if (rc == 0) {
+ /* CANCEL RPC should not be sent to server. */
+ list_del_init(&lock->l_bl_ast);
+ LDLM_LOCK_PUT(lock);
+ count--;
+ }
+ }
+ RETURN(count);
+}
+
+/* If @req is NULL, send CANCEL request to server with handles of locks
+ * in the @cancels. If EARLY_CANCEL is not supported, send CANCEL requests
+ * separately per lock.
+ * If @req is not NULL, put handles of locks in @cancels into the request
+ * buffer at the offset @off.
+ * Destroy @cancels at the end. */
+int ldlm_cli_cancel_list(struct list_head *cancels, int count,
+ struct ptlrpc_request *req, int off, int flags)
+{
+ struct ldlm_lock *lock;
+ int res = 0;
+ ENTRY;
+
+ if (list_empty(cancels) || count == 0)
+ RETURN(0);
+
+ /* XXX: requests (both batched and not) could be sent in parallel.
+ * Usually it is enough to have just 1 RPC, but it is possible that
+ * there are to many locks to be cancelled in LRU or on a resource.
+ * It would also speed up the case when the server does not support
+ * the feature. */
+ while (count > 0) {
+ LASSERT(!list_empty(cancels));
+ lock = list_entry(cancels->next, struct ldlm_lock, l_bl_ast);
+ LASSERT(lock->l_conn_export);
+
+ if (exp_connect_cancelset(lock->l_conn_export)) {
+ res = count;
+ if (req)
+ ldlm_cancel_pack(req, off, cancels, count);
+ else
+ res = ldlm_cli_cancel_req(lock->l_conn_export,
+ cancels, count, flags);
} else {
- ldlm_lock2handle(lock, &lockh);
- rc = ldlm_cli_cancel(&lockh);
- if (rc != ELDLM_OK)
- CERROR("ldlm_cli_cancel: %d\n", rc);
+ res = ldlm_cli_cancel_req(lock->l_conn_export,
+ cancels, 1, flags);
+ }
+
+ if (res < 0) {
+ CERROR("ldlm_cli_cancel_list: %d\n", res);
+ res = count;
}
- list_del_init(&lock->l_bl_ast);
- LDLM_LOCK_PUT(lock);
+
+ count -= res;
+ ldlm_lock_list_put(cancels, l_bl_ast, res);
}
+ LASSERT(list_empty(cancels));
+ LASSERT(count == 0);
+ RETURN(0);
+}
- ldlm_resource_putref(res);
+int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
+ const struct ldlm_res_id *res_id,
+ ldlm_policy_data_t *policy,
+ int mode, int flags, void *opaque)
+{
+ struct ldlm_resource *res;
+ CFS_LIST_HEAD(cancels);
+ int count;
+ int rc;
+ ENTRY;
+
+ res = ldlm_resource_get(ns, NULL, res_id, 0, 0);
+ if (res == NULL) {
+ /* This is not a problem. */
+ CDEBUG(D_INFO, "No resource "LPU64"\n", res_id->name[0]);
+ RETURN(0);
+ }
+ count = ldlm_cancel_resource_local(res, &cancels, policy, mode,
+ 0, flags, opaque);
+ rc = ldlm_cli_cancel_list(&cancels, count, NULL,
+ DLM_LOCKREQ_OFF, flags);
+ if (rc != ELDLM_OK)
+ CERROR("ldlm_cli_cancel_unused_resource: %d\n", rc);
+
+ ldlm_resource_putref(res);
RETURN(0);
}
* that have 0 readers/writers.
*
* If flags & LDLM_FL_LOCAL_ONLY, throw the locks away without trying
- * to notify the server.
- * If flags & LDLM_FL_WARN, print a warning if some locks are still in use. */
+ * to notify the server. */
int ldlm_cli_cancel_unused(struct ldlm_namespace *ns,
const struct ldlm_res_id *res_id,
int flags, void *opaque)
RETURN(ELDLM_OK);
if (res_id)
- RETURN(ldlm_cli_cancel_unused_resource(ns, res_id, flags,
+ RETURN(ldlm_cli_cancel_unused_resource(ns, res_id, NULL,
+ LCK_MINMODE, flags,
opaque));
spin_lock(&ns->ns_hash_lock);
spin_unlock(&ns->ns_hash_lock);
rc = ldlm_cli_cancel_unused_resource(ns, &res->lr_name,
+ NULL, LCK_MINMODE,
flags, opaque);
if (rc)
- CERROR("cancel_unused_res ("LPU64"): %d\n",
+ CERROR("ldlm_cli_cancel_unused ("LPU64"): %d\n",
res->lr_name.name[0], rc);
spin_lock(&ns->ns_hash_lock);
!lock->l_readers && !lock->l_writers &&
!(lock->l_flags & LDLM_FL_LOCAL) &&
!(lock->l_flags & LDLM_FL_CBPENDING)) {
+ lock->l_last_used = cfs_time_current();
spin_lock(&ns->ns_unused_lock);
LASSERT(ns->ns_nr_unused >= 0);
list_add_tail(&lock->l_lru, &ns->ns_unused_list);
ldlm_lock2desc(lock, &body->lock_desc);
body->lock_flags = flags;
- ldlm_lock2handle(lock, &body->lock_handle1);
+ ldlm_lock2handle(lock, &body->lock_handle[0]);
size[DLM_LOCKREPLY_OFF] = sizeof(*reply);
if (lock->l_lvb_len != 0) {
buffers = 3;
return snprintf(page, count, "%u\n", *temp);
}
+#define MAX_STRING_SIZE 128
+static int lprocfs_uint_wr(struct file *file, const char *buffer,
+ unsigned long count, void *data)
+{
+ unsigned *p = data;
+ char dummy[MAX_STRING_SIZE + 1], *end;
+ unsigned long tmp;
+
+ dummy[MAX_STRING_SIZE] = '\0';
+ if (copy_from_user(dummy, buffer, MAX_STRING_SIZE))
+ return -EFAULT;
+
+ tmp = simple_strtoul(dummy, &end, 0);
+ if (dummy == end)
+ return -EINVAL;
+
+ *p = (unsigned int)tmp;
+ return count;
+}
+
static int lprocfs_read_lru_size(char *page, char **start, off_t off,
int count, int *eof, void *data)
{
return snprintf(page, count, "%u\n", ns->ns_max_unused);
}
-#define MAX_STRING_SIZE 128
static int lprocfs_write_lru_size(struct file *file, const char *buffer,
unsigned long count, void *data)
{
lock_vars[0].read_fptr = lprocfs_read_lru_size;
lock_vars[0].write_fptr = lprocfs_write_lru_size;
lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
+
+ snprintf(lock_name, MAX_STRING_SIZE, "%s/lru_max_age",
+ ns->ns_name);
+ lock_vars[0].data = &ns->ns_max_age;
+ lock_vars[0].read_fptr = lprocfs_uint_rd;
+ lock_vars[0].write_fptr = lprocfs_uint_wr;
+ lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
+
}
}
#undef MAX_STRING_SIZE
CFS_INIT_LIST_HEAD(&ns->ns_unused_list);
ns->ns_nr_unused = 0;
ns->ns_max_unused = LDLM_DEFAULT_LRU_SIZE;
+ ns->ns_max_age = LDLM_DEFAULT_MAX_ALIVE;
spin_lock_init(&ns->ns_unused_lock);
mutex_down(&ldlm_namespace_lock);
lock->l_flags |= LDLM_FL_FAILED;
lock->l_flags |= flags;
+ /* ... without sending a CANCEL message for local_only. */
+ if (local_only)
+ lock->l_flags |= LDLM_FL_LOCAL_ONLY;
+
if (local_only && (lock->l_readers || lock->l_writers)) {
/* This is a little bit gross, but much better than the
* alternative: pretend that we got a blocking AST from
* the server, so that when the lock is decref'd, it
* will go away ... */
- /* ... without sending a CANCEL message. */
- lock->l_flags |= LDLM_FL_LOCAL_ONLY;
unlock_res(res);
LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY");
if (lock->l_completion_ast)
unlock_res(res);
ldlm_lock2handle(lock, &lockh);
- if (!local_only) {
- rc = ldlm_cli_cancel(&lockh);
- if (rc)
- CERROR("ldlm_cli_cancel: %d\n", rc);
- }
- /* Force local cleanup on errors, too. */
- if (local_only || rc != ELDLM_OK)
- ldlm_lock_cancel(lock);
+ rc = ldlm_cli_cancel(&lockh);
+ if (rc)
+ CERROR("ldlm_cli_cancel: %d\n", rc);
} else {
ldlm_resource_unlink_lock(lock);
unlock_res(res);
op_data->op_mode = mode;
op_data->op_namelen = namelen;
op_data->op_mod_time = CURRENT_TIME;
+ op_data->op_data = NULL;
}
void llu_finish_md_op_data(struct md_op_data *op_data)
parent = de->d_parent->d_inode;
if (it->it_op & IT_CREAT) {
- op_data = ll_prep_md_op_data(NULL, parent, NULL, de->d_name.name,
- de->d_name.len, 0, LUSTRE_OPC_CREATE);
+ op_data = ll_prep_md_op_data(NULL, parent, NULL,
+ de->d_name.name, de->d_name.len,
+ 0, LUSTRE_OPC_CREATE, NULL);
} else {
op_data = ll_prep_md_op_data(NULL, parent, de->d_inode,
de->d_name.name, de->d_name.len,
- 0, LUSTRE_OPC_ANY);
+ 0, LUSTRE_OPC_ANY, NULL);
}
if (IS_ERR(op_data))
RETURN(PTR_ERR(op_data));
op_data = ll_prep_md_op_data(NULL, parent, NULL, de->d_name.name,
de->d_name.len, 0, (it->it_op & IT_CREAT ?
LUSTRE_OPC_CREATE :
- LUSTRE_OPC_ANY));
+ LUSTRE_OPC_ANY), NULL);
if (IS_ERR(op_data))
RETURN(PTR_ERR(op_data));
struct md_op_data *op_data;
op_data = ll_prep_md_op_data(NULL, dir, NULL, NULL, 0, 0,
- LUSTRE_OPC_ANY);
+ LUSTRE_OPC_ANY, NULL);
if (IS_ERR(op_data))
return (void *)op_data;
lustre_swab_lov_user_md(lump);
op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
- LUSTRE_OPC_ANY);
+ LUSTRE_OPC_ANY, NULL);
if (IS_ERR(op_data))
RETURN(PTR_ERR(op_data));
op_data = ll_prep_md_op_data(NULL, parent->d_inode,
file->f_dentry->d_inode, name, len,
- O_RDWR, LUSTRE_OPC_ANY);
+ O_RDWR, LUSTRE_OPC_ANY, NULL);
if (IS_ERR(op_data))
RETURN(PTR_ERR(op_data));
.it_flags = head_filp->f_flags|O_JOIN_FILE};
struct lustre_handle lockh;
struct md_op_data *op_data;
- __u32 hsize = head_inode->i_size >> 32;
- __u32 tsize = head_inode->i_size;
int rc;
ENTRY;
op_data = ll_prep_md_op_data(NULL, head_inode, tail_parent,
tail_dentry->d_name.name,
tail_dentry->d_name.len, 0,
- LUSTRE_OPC_ANY);
+ LUSTRE_OPC_ANY, &head_inode->i_size);
if (IS_ERR(op_data))
RETURN(PTR_ERR(op_data));
rc = md_enqueue(ll_i2mdexp(head_inode), LDLM_IBITS, &oit, LCK_CW,
- op_data, &lockh, &tsize, 0, ldlm_completion_ast,
- ll_md_blocking_ast, &hsize, 0);
+ op_data, &lockh, NULL, 0, ldlm_completion_ast,
+ ll_md_blocking_ast, NULL, 0);
ll_finish_md_op_data(op_data);
if (rc < 0)
/* Call getattr by fid, so do not provide name at all. */
op_data = ll_prep_md_op_data(NULL, dentry->d_parent->d_inode,
dentry->d_inode, NULL, 0, 0,
- LUSTRE_OPC_ANY);
+ LUSTRE_OPC_ANY, NULL);
if (IS_ERR(op_data))
RETURN(PTR_ERR(op_data));
struct dentry *ll_find_alias(struct inode *, struct dentry *);
int ll_md_blocking_ast(struct ldlm_lock *, struct ldlm_lock_desc *,
void *data, int flag);
-int ll_md_cancel_unused(struct lustre_handle *, struct inode *, int flags,
- void *opaque);
#ifndef LUSTRE_KERNEL_VERSION
struct lookup_intent *ll_convert_intent(struct open_intent *oit,
int lookup_flags);
struct md_op_data *ll_prep_md_op_data(struct md_op_data *op_data,
struct inode *i1, struct inode *i2,
const char *name, int namelen,
- int mode, __u32 opc);
+ int mode, __u32 opc, void *data);
void ll_finish_md_op_data(struct md_op_data *op_data);
/* llite/llite_nfs.c */
data->ocd_connect_flags = OBD_CONNECT_IBITS | OBD_CONNECT_NODEVOH |
OBD_CONNECT_JOIN |
OBD_CONNECT_ATTRFID | OBD_CONNECT_VERSION |
- OBD_CONNECT_MDS_CAPA | OBD_CONNECT_OSS_CAPA;
+ OBD_CONNECT_MDS_CAPA | OBD_CONNECT_OSS_CAPA |
+ OBD_CONNECT_CANCELSET;
#ifdef CONFIG_FS_POSIX_ACL
data->ocd_connect_flags |= OBD_CONNECT_ACL;
#endif
}
data->ocd_connect_flags = OBD_CONNECT_GRANT | OBD_CONNECT_VERSION |
- OBD_CONNECT_REQPORTAL | OBD_CONNECT_BRW_SIZE;
+ OBD_CONNECT_REQPORTAL | OBD_CONNECT_BRW_SIZE |
+ OBD_CONNECT_CANCELSET;
if (sbi->ll_flags & LL_SBI_OSS_CAPA)
data->ocd_connect_flags |= OBD_CONNECT_OSS_CAPA;
ENTRY;
op_data = ll_prep_md_op_data(op_data, inode, NULL, NULL, 0, 0,
- LUSTRE_OPC_ANY);
+ LUSTRE_OPC_ANY, NULL);
if (IS_ERR(op_data))
RETURN(PTR_ERR(op_data));
RETURN(-ENOMEM);
op_data = ll_prep_md_op_data(NULL, inode, NULL, NULL, 0, 0,
- LUSTRE_OPC_ANY);
+ LUSTRE_OPC_ANY, NULL);
if (IS_ERR(op_data))
RETURN(PTR_ERR(op_data));
struct md_op_data * ll_prep_md_op_data(struct md_op_data *op_data,
struct inode *i1, struct inode *i2,
const char *name, int namelen,
- int mode, __u32 opc)
+ int mode, __u32 opc, void *data)
{
LASSERT(i1 != NULL);
op_data->op_bias = MDS_CHECK_SPLIT;
op_data->op_opc = opc;
op_data->op_mds = 0;
+ op_data->op_data = data;
return op_data;
}
if (inode == NULL)
break;
+ LASSERT(lock->l_flags & LDLM_FL_CANCELING);
+ if ((bits & MDS_INODELOCK_LOOKUP) &&
+ ll_have_md_lock(inode, MDS_INODELOCK_LOOKUP))
+ bits &= ~MDS_INODELOCK_LOOKUP;
+ if ((bits & MDS_INODELOCK_UPDATE) &&
+ ll_have_md_lock(inode, MDS_INODELOCK_UPDATE))
+ bits &= ~MDS_INODELOCK_UPDATE;
+ if ((bits & MDS_INODELOCK_OPEN) &&
+ ll_have_md_lock(inode, MDS_INODELOCK_OPEN))
+ bits &= ~MDS_INODELOCK_OPEN;
+
fid = ll_inode2fid(inode);
if (lock->l_resource->lr_name.name[0] != fid_seq(fid) ||
lock->l_resource->lr_name.name[1] != fid_oid(fid) ||
opc = LUSTRE_OPC_ANY;
op_data = ll_prep_md_op_data(NULL, parent, NULL, dentry->d_name.name,
- dentry->d_name.len, lookup_flags, opc);
+ dentry->d_name.len, lookup_flags, opc,
+ NULL);
if (IS_ERR(op_data))
RETURN((void *)op_data);
tgt_len = strlen(tgt) + 1;
op_data = ll_prep_md_op_data(NULL, dir, NULL, name->name,
- name->len, 0, opc);
+ name->len, 0, opc, NULL);
if (IS_ERR(op_data))
GOTO(err_exit, err = PTR_ERR(op_data));
dir->i_generation, dir, name->len, name->name);
op_data = ll_prep_md_op_data(NULL, src, dir, name->name, name->len,
- 0, LUSTRE_OPC_ANY);
+ 0, LUSTRE_OPC_ANY, NULL);
if (IS_ERR(op_data))
RETURN(PTR_ERR(op_data));
RETURN(err);
}
+/* Try to find the child dentry by its name.
+ If found, put the result fid into @fid. */
+static void ll_get_child_fid(struct inode * dir, struct qstr *name,
+ struct lu_fid *fid)
+{
+ struct dentry *parent, *child;
+
+ parent = list_entry(dir->i_dentry.next, struct dentry, d_alias);
+ child = d_lookup(parent, name);
+ if (child) {
+ if (child->d_inode)
+ *fid = *ll_inode2fid(child->d_inode);
+ dput(child);
+ }
+}
+
static int ll_rmdir_generic(struct inode *dir, struct dentry *dparent,
struct dentry *dchild, struct qstr *name)
{
RETURN(-EBUSY);
op_data = ll_prep_md_op_data(NULL, dir, NULL, name->name, name->len,
- S_IFDIR, LUSTRE_OPC_ANY);
+ S_IFDIR, LUSTRE_OPC_ANY, NULL);
if (IS_ERR(op_data))
RETURN(PTR_ERR(op_data));
+ ll_get_child_fid(dir, name, &op_data->op_fid3);
rc = md_unlink(ll_i2sbi(dir)->ll_md_exp, op_data, &request);
ll_finish_md_op_data(op_data);
if (rc == 0)
RETURN(-EBUSY);
op_data = ll_prep_md_op_data(NULL, dir, NULL, name->name,
- name->len, 0, LUSTRE_OPC_ANY);
+ name->len, 0, LUSTRE_OPC_ANY, NULL);
if (IS_ERR(op_data))
RETURN(PTR_ERR(op_data));
+ ll_get_child_fid(dir, name, &op_data->op_fid3);
rc = md_unlink(ll_i2sbi(dir)->ll_md_exp, op_data, &request);
ll_finish_md_op_data(op_data);
RETURN(-EBUSY);
op_data = ll_prep_md_op_data(NULL, src, tgt, NULL, 0, 0,
- LUSTRE_OPC_ANY);
+ LUSTRE_OPC_ANY, NULL);
if (IS_ERR(op_data))
RETURN(PTR_ERR(op_data));
+ ll_get_child_fid(src, src_name, &op_data->op_fid3);
+ ll_get_child_fid(tgt, tgt_name, &op_data->op_fid4);
err = md_rename(sbi->ll_md_exp, op_data,
src_name->name, src_name->len,
tgt_name->name, tgt_name->len, &request);
CDEBUG(D_OTHER, "CREATE '%*s' on "DFID"\n", op_data->op_namelen,
op_data->op_name, PFID(&op_data->op_fid1));
+ op_data->op_flags |= MF_MDC_CANCEL_FID1;
rc = md_create(tgt_exp, op_data, data, datalen, mode, uid, gid,
cap_effective, rdev, request);
if (rc == 0) {
RETURN(rc);
}
+#define md_op_data_fid(op_data, fl) \
+ (fl == MF_MDC_CANCEL_FID1 ? &op_data->op_fid1 : \
+ fl == MF_MDC_CANCEL_FID2 ? &op_data->op_fid2 : \
+ fl == MF_MDC_CANCEL_FID3 ? &op_data->op_fid3 : \
+ fl == MF_MDC_CANCEL_FID4 ? &op_data->op_fid4 : \
+ NULL)
+
+/* @tgt_exp is the export the metadata request is sent.
+ * @fid_exp is the export the cancel should be sent for the current fid.
+ * if @fid_exp is NULL, the export is found for the current fid.
+ * @op_data keeps the current fid, which is pointed through @flag.
+ * @mode, @bits -- lock match parameters. */
+static int lmv_early_cancel(struct lmv_obd *lmv, struct obd_export *tgt_exp,
+ struct obd_export *fid_exp,
+ struct md_op_data *op_data,
+ ldlm_mode_t mode, int bits, int flag)
+{
+ struct lu_fid *fid = md_op_data_fid(op_data, flag);
+ ldlm_policy_data_t policy = {{0}};
+ int rc = 0;
+ ENTRY;
+
+ if (!fid_is_sane(fid))
+ RETURN(0);
+
+ if (fid_exp == NULL)
+ fid_exp = lmv_find_export(lmv, fid);
+
+ if (tgt_exp == fid_exp) {
+ /* The export is the same as on the target server, cancel
+ * will be sent along with the main metadata operation. */
+ op_data->op_flags |= flag;
+ RETURN(0);
+ }
+
+ policy.l_inodebits.bits = bits;
+ rc = md_cancel_unused(fid_exp, fid, &policy, mode, LDLM_FL_ASYNC, NULL);
+ RETURN(rc);
+}
+
+#ifdef EARLY_CANCEL_FOR_STRIPED_DIR_IS_READY
+/* Check if the fid in @op_data pointed to by flag is of the same export(s)
+ * as @tgt_exp. Early cancels will be sent later by mdc code, otherwise, call
+ * md_cancel_unused for child export(s). */
+static int lmv_early_cancel_stripes(struct obd_export *exp,
+ struct obd_export *tgt_exp,
+ struct md_op_data *op_data,
+ ldlm_mode_t mode, int bits, int flag)
+{
+ struct lu_fid *fid = md_op_data_fid(op_data, flag);
+ struct obd_device *obd = exp->exp_obd;
+ struct lmv_obd *lmv = &obd->u.lmv;
+ struct obd_export *st_exp;
+ struct lmv_obj *obj;
+ int rc = 0;
+ ENTRY;
+
+ if (!fid_is_sane(fid))
+ RETURN(0);
+
+ obj = lmv_obj_grab(obd, fid);
+ if (obj) {
+ ldlm_policy_data_t policy = {{0}};
+ struct lu_fid *st_fid;
+ int i;
+
+ policy.l_inodebits.bits = bits;
+ for (i = 0; i < obj->lo_objcount; i++) {
+ st_exp = lmv_get_export(lmv, obj->lo_inodes[i].li_mds);
+ st_fid = &obj->lo_inodes[i].li_fid;
+ if (tgt_exp != st_exp) {
+ rc = md_cancel_unused(st_exp, st_fid, &policy,
+ mode, 0, NULL);
+ if (rc)
+ break;
+ } else {
+ /* Some export matches to @tgt_exp, do cancel
+ * for its fid in mdc */
+ *fid = *st_fid;
+ op_data->op_flags |= flag;
+ }
+ }
+ lmv_obj_put(obj);
+ } else {
+ rc = lmv_early_cancel(lmv, tgt_exp, NULL, op_data,
+ mode, bits, flag);
+ }
+ RETURN(rc);
+}
+#endif
+
/*
* llite passes fid of an target inode in op_data->op_fid1 and id of directory in
* op_data->op_fid2
{
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
+ struct obd_export *tgt_exp;
struct lmv_obj *obj;
int rc, loop = 0;
mdsno_t mds;
op_data->op_fsgid = current->fsgid;
op_data->op_cap = current->cap_effective;
- rc = md_link(lmv->tgts[mds].ltd_exp, op_data, request);
+ tgt_exp = lmv->tgts[mds].ltd_exp;
+ if (op_data->op_namelen) {
+ op_data->op_flags |= MF_MDC_CANCEL_FID2;
+ /* Cancel UPDATE lock on child (fid1). */
+ rc = lmv_early_cancel(lmv, tgt_exp, NULL, op_data, LCK_EX,
+ MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID1);
+ }
+ if (rc == 0)
+ rc = md_link(tgt_exp, op_data, request);
if (rc == -ERESTART) {
LASSERT(*request != NULL);
DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
const char *old, int oldlen, const char *new, int newlen,
struct ptlrpc_request **request)
{
+ struct obd_export *tgt_exp = NULL, *src_exp;
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
int rc, mea_idx, loop = 0;
struct lmv_obj *obj;
- mdsno_t mds;
+ mdsno_t mds1, mds2;
ENTRY;
CDEBUG(D_OTHER, "rename %*s in "DFID" to %*s in "DFID"\n",
"to "DFID"\n", newlen, new, oldlen, newlen,
PFID(&op_data->op_fid2), PFID(&op_data->op_fid1));
- rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds);
+ rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds1);
if (rc)
RETURN(rc);
mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
(char *)old, oldlen);
op_data->op_fid1 = obj->lo_inodes[mea_idx].li_fid;
- mds = obj->lo_inodes[mea_idx].li_mds;
+ mds1 = obj->lo_inodes[mea_idx].li_mds;
CDEBUG(D_OTHER, "Parent obj "DFID"\n", PFID(&op_data->op_fid1));
lmv_obj_put(obj);
} else {
- rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds);
+ rc = lmv_fld_lookup(lmv, &op_data->op_fid1, &mds1);
if (rc)
RETURN(rc);
}
mea_idx = raw_name2idx(obj->lo_hashtype, obj->lo_objcount,
(char *)new, newlen);
+ mds2 = obj->lo_inodes[mea_idx].li_mds;
op_data->op_fid2 = obj->lo_inodes[mea_idx].li_fid;
CDEBUG(D_OTHER, "Parent obj "DFID"\n", PFID(&op_data->op_fid2));
lmv_obj_put(obj);
+ } else {
+ rc = lmv_fld_lookup(lmv, &op_data->op_fid2, &mds2);
+ if (rc)
+ RETURN(rc);
}
request:
op_data->op_fsgid = current->fsgid;
op_data->op_cap = current->cap_effective;
- rc = md_rename(lmv->tgts[mds].ltd_exp, op_data, old, oldlen,
- new, newlen, request);
+ src_exp = lmv_get_export(lmv, mds1);
+ tgt_exp = lmv_get_export(lmv, mds2);
+ if (oldlen) {
+ /* LOOKUP lock on src child (fid3) should also be cancelled for
+ * src_exp in mdc_rename. */
+ op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
+
+ /* Cancel UPDATE locks on tgt parent (fid2), tgt_exp is its
+ * own export. */
+ rc = lmv_early_cancel(lmv, src_exp, tgt_exp, op_data, LCK_EX,
+ MDS_INODELOCK_UPDATE, MF_MDC_CANCEL_FID2);
+
+ /* Cancel LOOKUP locks on tgt child (fid4) for parent tgt_exp.*/
+ if (rc == 0)
+ rc = lmv_early_cancel(lmv, src_exp, tgt_exp, op_data,
+ LCK_EX, MDS_INODELOCK_LOOKUP,
+ MF_MDC_CANCEL_FID4);
+
+ /* XXX: the case when child is a striped dir is not supported.
+ * Only the master stripe has all locks cancelled early. */
+ /* Cancel all the locks on tgt child (fid4). */
+ if (rc == 0)
+ rc = lmv_early_cancel(lmv, src_exp, NULL, op_data,
+ LCK_EX, MDS_INODELOCK_FULL,
+ MF_MDC_CANCEL_FID4);
+ }
+
+ if (rc == 0)
+ rc = md_rename(src_exp, op_data, old, oldlen,
+ new, newlen, request);
if (rc == -ERESTART) {
LASSERT(*request != NULL);
DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
PFID(&op_data->op_fid1), op_data->op_attr.ia_valid,
obj ? ", split" : "");
+ op_data->op_flags |= MF_MDC_CANCEL_FID1;
if (obj) {
for (i = 0; i < obj->lo_objcount; i++) {
op_data->op_fid1 = obj->lo_inodes[i].li_fid;
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
struct obd_export *tgt_exp = NULL;
+ struct lmv_obj *obj;
int rc, loop = 0;
ENTRY;
++loop;
LASSERT(loop <= 2);
if (op_data->op_namelen != 0) {
- struct lmv_obj *obj;
int mea_idx;
obj = lmv_obj_grab(obd, &op_data->op_fid1);
op_data->op_fsgid = current->fsgid;
op_data->op_cap = current->cap_effective;
- rc = md_unlink(tgt_exp, op_data, request);
+ /* If child's fid is given, cancel unused locks for it if it is from
+ * another export than parent. */
+ if (op_data->op_namelen) {
+ /* LOOKUP lock for child (fid3) should also be cancelled on
+ * parent tgt_exp in mdc_unlink(). */
+ op_data->op_flags |= MF_MDC_CANCEL_FID1 | MF_MDC_CANCEL_FID3;
+
+ /* XXX: the case when child is a striped dir is not supported.
+ * Only the master stripe has all locks cancelled early. */
+ /* Cancel FULL locks on child (fid3). */
+ rc = lmv_early_cancel(lmv, tgt_exp, NULL, op_data, LCK_EX,
+ MDS_INODELOCK_FULL, MF_MDC_CANCEL_FID3);
+ }
+ if (rc == 0)
+ rc = md_unlink(tgt_exp, op_data, request);
if (rc == -ERESTART) {
LASSERT(*request != NULL);
DEBUG_REQ(D_WARNING|D_RPCTRACE, *request,
static int lmv_cancel_unused(struct obd_export *exp,
const struct lu_fid *fid,
- int flags, void *opaque)
+ ldlm_policy_data_t *policy,
+ ldlm_mode_t mode, int flags, void *opaque)
{
struct obd_device *obd = exp->exp_obd;
struct lmv_obd *lmv = &obd->u.lmv;
if (!lmv->tgts[i].ltd_exp || !lmv->tgts[i].ltd_active)
continue;
- err = md_cancel_unused(lmv->tgts[i].ltd_exp,
- fid, flags, opaque);
+ err = md_cancel_unused(lmv->tgts[i].ltd_exp, fid,
+ policy, mode, flags, opaque);
if (!rc)
rc = err;
}
ldlm_completion_callback cb_completion,
ldlm_blocking_callback cb_blocking,
void *cb_data, int extra_lock_flags);
-
+int mdc_resource_get_unused(struct obd_export *exp, struct lu_fid *fid,
+ struct list_head *cancels, ldlm_mode_t mode,
+ __u64 bits);
/* mdc/mdc_request.c */
int mdc_fid_alloc(struct obd_export *exp, struct lu_fid *fid,
struct md_op_data *op_data);
int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data,
struct ptlrpc_request **request);
int mdc_cancel_unused(struct obd_export *exp, const struct lu_fid *fid,
+ ldlm_policy_data_t *policy, ldlm_mode_t mode,
int flags, void *opaque);
int mdc_lock_match(struct obd_export *exp, int flags,
const struct lu_fid *fid, ldlm_type_t type,
rec->cr_time = op_data->op_mod_time;
rec->cr_suppgid1 = op_data->op_suppgids[0];
rec->cr_suppgid2 = op_data->op_suppgids[1];
- rec->cr_flags = op_data->op_flags;
+ rec->cr_flags = op_data->op_flags & ~MF_SOM_LOCAL_FLAGS;
rec->cr_bias = op_data->op_bias;
mdc_pack_capa(req, offset + 1, op_data->op_capa1);
{
memcpy(&epoch->handle, &op_data->op_handle, sizeof(epoch->handle));
epoch->ioepoch = op_data->op_ioepoch;
- epoch->flags = op_data->op_flags;
+ epoch->flags = op_data->op_flags & ~MF_SOM_LOCAL_FLAGS;
}
void mdc_setattr_pack(struct ptlrpc_request *req, int offset,
int mdc_cancel_unused(struct obd_export *exp,
const struct lu_fid *fid,
- int flags, void *opaque)
+ ldlm_policy_data_t *policy,
+ ldlm_mode_t mode, int flags, void *opaque)
{
struct ldlm_res_id res_id =
{ .name = {fid_seq(fid),
ENTRY;
- rc = ldlm_cli_cancel_unused(obd->obd_namespace, &res_id,
- flags, opaque);
+ rc = ldlm_cli_cancel_unused_resource(obd->obd_namespace, &res_id,
+ policy, mode, flags, opaque);
RETURN(rc);
}
struct ldlm_reply *lockrep;
int size[9] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
[DLM_LOCKREQ_OFF] = sizeof(*lockreq),
- [DLM_INTENT_IT_OFF] = sizeof(*lit) };
+ [DLM_INTENT_IT_OFF] = sizeof(*lit),
+ 0, 0, 0, 0, 0, 0 };
int repsize[7] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
[DLM_LOCKREPLY_OFF] = sizeof(*lockrep),
[DLM_REPLY_REC_OFF] = sizeof(struct mdt_body),
if (it->it_op & IT_OPEN) {
int do_join = !!(it->it_flags & O_JOIN_FILE);
+ CFS_LIST_HEAD(cancels);
+ int count = 0;
+ int mode;
it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
*/
size[DLM_INTENT_REC_OFF + 4] = max(lmmsize,
obddev->u.cli.cl_default_mds_easize);
+
+ /* XXX: openlock is not cancelled for cross-refs. */
+ /* If inode is known, cancel conflicting OPEN locks. */
+ if (fid_is_sane(&op_data->op_fid2)) {
+ if (it->it_flags & (FMODE_WRITE|MDS_OPEN_TRUNC))
+ mode = LCK_CW;
+#ifdef FMODE_EXEC
+ else if (it->it_flags & FMODE_EXEC)
+ mode = LCK_PR;
+#endif
+ else
+ mode = LCK_CR;
+ count = mdc_resource_get_unused(exp, &op_data->op_fid2,
+ &cancels, mode,
+ MDS_INODELOCK_OPEN);
+ }
+
+ /* If CREATE or JOIN_FILE, cancel parent's UPDATE lock. */
+ if (it->it_op & IT_CREAT || it->it_flags & O_JOIN_FILE)
+ mode = LCK_EX;
+ else
+ mode = LCK_CR;
+ count += mdc_resource_get_unused(exp, &op_data->op_fid1,
+ &cancels, mode,
+ MDS_INODELOCK_UPDATE);
+
if (do_join)
size[DLM_INTENT_REC_OFF + 5] =
sizeof(struct mdt_rec_join);
- req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
- LDLM_ENQUEUE, 8 + do_join, size, NULL);
+ req = ldlm_prep_enqueue_req(exp, 8 + do_join, size, &cancels,
+ count);
if (!req)
RETURN(-ENOMEM);
if (do_join) {
- __u64 head_size = *(__u32*)cb_data;
- __u32 tsize = *(__u32*)lmm;
-
/* join is like an unlink of the tail */
policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
- /* when joining file, cb_data and lmm args together
- * indicate the head file size*/
mdc_join_pack(req, DLM_INTENT_REC_OFF + 5, op_data,
- (head_size << 32) | tsize);
- cb_data = NULL;
- lmm = NULL;
+ (*(__u64 *)op_data->op_data));
}
spin_lock(&req->rq_lock);
sizeof(struct lustre_capa) : 0;
size[DLM_INTENT_REC_OFF + 2] = op_data->op_namelen + 1;
policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
- req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
- LDLM_ENQUEUE, 6, size, NULL);
+ req = ldlm_prep_enqueue_req(exp, 6, size, NULL, 0);
if (!req)
RETURN(-ENOMEM);
if (it->it_op & IT_GETATTR)
policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
- req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
- LDLM_ENQUEUE, 6, size, NULL);
+ req = ldlm_prep_enqueue_req(exp, 6, size, NULL, 0);
if (!req)
RETURN(-ENOMEM);
repsize[repbufcnt++] = sizeof(struct lustre_capa);
} else if (it->it_op == IT_READDIR) {
policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
- req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
- LDLM_ENQUEUE, 2, size, NULL);
+ req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
if (!req)
RETURN(-ENOMEM);
#include <obd_class.h>
#include "mdc_internal.h"
+#include <lustre_fid.h>
/* mdc_setattr does its own semaphore handling */
static int mdc_reint(struct ptlrpc_request *request,
return rc;
}
+/* Find and cancel locally locks matched by inode @bits & @mode in the resource
+ * found by @fid. Found locks are added into @cancel list. Returns the amount of
+ * locks added to @cancels list. */
+int mdc_resource_get_unused(struct obd_export *exp, struct lu_fid *fid,
+ struct list_head *cancels, ldlm_mode_t mode,
+ __u64 bits)
+{
+ ldlm_policy_data_t policy = {{0}};
+ struct ldlm_res_id res_id;
+ struct ldlm_resource *res;
+ int count;
+ ENTRY;
+
+ fid_build_reg_res_name(fid, &res_id);
+ res = ldlm_resource_get(exp->exp_obd->obd_namespace,
+ NULL, &res_id, 0, 0);
+ if (res == NULL)
+ RETURN(0);
+
+ /* Initialize ibits lock policy. */
+ policy.l_inodebits.bits = bits;
+ count = ldlm_cancel_resource_local(res, cancels, &policy,
+ mode, 0, 0, NULL);
+ ldlm_resource_putref(res);
+ RETURN(count);
+}
+
/* If mdc_setattr is called with an 'iattr', then it is a normal RPC that
* should take the normal semaphore and go to the normal portal.
*
void *ea, int ealen, void *ea2, int ea2len,
struct ptlrpc_request **request)
{
+ CFS_LIST_HEAD(cancels);
struct ptlrpc_request *req;
struct mdt_rec_setattr *rec;
struct mdc_rpc_lock *rpc_lock;
struct obd_device *obd = exp->exp_obd;
- int size[6] = { sizeof(struct ptlrpc_body),
- sizeof(*rec), 0, 0, ealen, ea2len };
- int bufcount = 4, rc;
+ int size[7] = { sizeof(struct ptlrpc_body),
+ sizeof(*rec), 0, 0, ealen, ea2len, 0 };
+ int count = 0, bufcount = 4, rc;
+ __u64 bits;
ENTRY;
LASSERT(op_data != NULL);
size[REQ_REC_OFF + 1] = op_data->op_capa1 ?
- sizeof(struct lustre_capa) : 0;
+ sizeof(struct lustre_capa) : 0;
if (op_data->op_flags & (MF_SOM_CHANGE | MF_EPOCH_OPEN))
size[REQ_REC_OFF + 2] = sizeof(struct mdt_epoch);
bufcount++;
}
+ bits = MDS_INODELOCK_UPDATE;
+ if (op_data->op_attr.ia_valid & (ATTR_MODE|ATTR_UID|ATTR_GID))
+ bits |= MDS_INODELOCK_LOOKUP;
+ if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
+ (fid_is_sane(&op_data->op_fid1)))
+ count = mdc_resource_get_unused(exp, &op_data->op_fid1,
+ &cancels, LCK_EX, bits);
+ if (exp_connect_cancelset(exp) && count) {
+ bufcount = 7;
+ size[REQ_REC_OFF + 5] = ldlm_request_bufsize(count, MDS_REINT);
+ }
req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
MDS_REINT, bufcount, size, NULL);
+ if (req)
+ ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 5, 0);
+ else
+ ldlm_lock_list_put(&cancels, l_bl_ast, count);
+
if (req == NULL)
RETURN(-ENOMEM);
const void *data, int datalen, int mode, __u32 uid, __u32 gid,
__u32 cap_effective, __u64 rdev, struct ptlrpc_request **request)
{
- int size[5] = { sizeof(struct ptlrpc_body),
+ int size[6] = { sizeof(struct ptlrpc_body),
sizeof(struct mdt_rec_create),
- 0, op_data->op_namelen + 1 };
+ 0, op_data->op_namelen + 1, 0, 0 };
struct obd_device *obd = exp->exp_obd;
int level, bufcount = 4, rc;
struct ptlrpc_request *req;
+ int count = 0;
+ CFS_LIST_HEAD(cancels);
ENTRY;
/* For case if upper layer did not alloc fid, do it now. */
bufcount++;
}
+ if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
+ (fid_is_sane(&op_data->op_fid1)))
+ count = mdc_resource_get_unused(exp, &op_data->op_fid1,
+ &cancels, LCK_EX,
+ MDS_INODELOCK_UPDATE);
+ if (exp_connect_cancelset(exp) && count) {
+ bufcount = 6;
+ size[REQ_REC_OFF + 4] = ldlm_request_bufsize(count, MDS_REINT);
+ }
req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
MDS_REINT, bufcount, size, NULL);
+ if (req)
+ ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 4, 0);
+ else
+ ldlm_lock_list_put(&cancels, l_bl_ast, count);
+
if (req == NULL)
RETURN(-ENOMEM);
int mdc_unlink(struct obd_export *exp, struct md_op_data *op_data,
struct ptlrpc_request **request)
{
+ CFS_LIST_HEAD(cancels);
struct obd_device *obd = class_exp2obd(exp);
struct ptlrpc_request *req = *request;
- int size[4] = { sizeof(struct ptlrpc_body),
+ int size[5] = { sizeof(struct ptlrpc_body),
sizeof(struct mdt_rec_unlink),
- 0, op_data->op_namelen + 1 };
- int rc;
+ 0, op_data->op_namelen + 1, 0 };
+ int count = 0, rc, bufcount = 4;
ENTRY;
LASSERT(req == NULL);
size[REQ_REC_OFF + 1] = op_data->op_capa1 ?
- sizeof(struct lustre_capa) : 0;
+ sizeof(struct lustre_capa) : 0;
+ if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
+ (fid_is_sane(&op_data->op_fid1)))
+ count = mdc_resource_get_unused(exp, &op_data->op_fid1,
+ &cancels, LCK_EX,
+ MDS_INODELOCK_UPDATE);
+ if ((op_data->op_flags & MF_MDC_CANCEL_FID3) &&
+ (fid_is_sane(&op_data->op_fid3)))
+ count += mdc_resource_get_unused(exp, &op_data->op_fid3,
+ &cancels, LCK_EX,
+ MDS_INODELOCK_FULL);
+ if (exp_connect_cancelset(exp) && count) {
+ bufcount = 5;
+ size[REQ_REC_OFF + 3] = ldlm_request_bufsize(count, MDS_REINT);
+ }
req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
- MDS_REINT, 4, size, NULL);
+ MDS_REINT, bufcount, size, NULL);
+ if (req)
+ ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 3, 0);
+ else
+ ldlm_lock_list_put(&cancels, l_bl_ast, count);
+
if (req == NULL)
RETURN(-ENOMEM);
*request = req;
int mdc_link(struct obd_export *exp, struct md_op_data *op_data,
struct ptlrpc_request **request)
{
+ CFS_LIST_HEAD(cancels);
struct obd_device *obd = exp->exp_obd;
struct ptlrpc_request *req;
- int size[5] = { sizeof(struct ptlrpc_body),
+ int size[6] = { sizeof(struct ptlrpc_body),
sizeof(struct mdt_rec_link),
- 0, 0, op_data->op_namelen + 1 };
- int rc;
+ 0, 0, op_data->op_namelen + 1, 0 };
+ int count = 0, rc, bufcount = 5;
ENTRY;
size[REQ_REC_OFF + 1] = op_data->op_capa1 ?
size[REQ_REC_OFF + 2] = op_data->op_capa2 ?
sizeof(struct lustre_capa) : 0;
+ if ((op_data->op_flags & MF_MDC_CANCEL_FID2) &&
+ (fid_is_sane(&op_data->op_fid2)))
+ count = mdc_resource_get_unused(exp, &op_data->op_fid2,
+ &cancels, LCK_EX,
+ MDS_INODELOCK_UPDATE);
+ if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
+ (fid_is_sane(&op_data->op_fid1)))
+ count += mdc_resource_get_unused(exp, &op_data->op_fid1,
+ &cancels, LCK_EX,
+ MDS_INODELOCK_UPDATE);
+ if (exp_connect_cancelset(exp) && count) {
+ bufcount = 6;
+ size[REQ_REC_OFF + 4] = ldlm_request_bufsize(count, MDS_REINT);
+ }
+
req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
- MDS_REINT, 5, size, NULL);
+ MDS_REINT, bufcount, size, NULL);
+ if (req)
+ ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 4, 0);
+ else
+ ldlm_lock_list_put(&cancels, l_bl_ast, count);
+
if (req == NULL)
RETURN(-ENOMEM);
const char *old, int oldlen, const char *new, int newlen,
struct ptlrpc_request **request)
{
+ CFS_LIST_HEAD(cancels);
struct obd_device *obd = exp->exp_obd;
struct ptlrpc_request *req;
- int size[6] = { sizeof(struct ptlrpc_body),
+ int size[7] = { sizeof(struct ptlrpc_body),
sizeof(struct mdt_rec_rename),
- 0, 0, oldlen + 1, newlen + 1 };
- int rc;
+ 0, 0, oldlen + 1, newlen + 1, 0 };
+ int count = 0, rc, bufcount = 6;
ENTRY;
size[REQ_REC_OFF + 1] = op_data->op_capa1 ?
size[REQ_REC_OFF + 2] = op_data->op_capa2 ?
sizeof(struct lustre_capa) : 0;
+ if ((op_data->op_flags & MF_MDC_CANCEL_FID1) &&
+ (fid_is_sane(&op_data->op_fid1)))
+ count = mdc_resource_get_unused(exp, &op_data->op_fid1,
+ &cancels, LCK_EX,
+ MDS_INODELOCK_UPDATE);
+ if ((op_data->op_flags & MF_MDC_CANCEL_FID2) &&
+ (fid_is_sane(&op_data->op_fid2)))
+ count += mdc_resource_get_unused(exp, &op_data->op_fid2,
+ &cancels, LCK_EX,
+ MDS_INODELOCK_UPDATE);
+ if ((op_data->op_flags & MF_MDC_CANCEL_FID3) &&
+ (fid_is_sane(&op_data->op_fid3)))
+ count += mdc_resource_get_unused(exp, &op_data->op_fid3,
+ &cancels, LCK_EX,
+ MDS_INODELOCK_LOOKUP);
+ if ((op_data->op_flags & MF_MDC_CANCEL_FID4) &&
+ (fid_is_sane(&op_data->op_fid4)))
+ count += mdc_resource_get_unused(exp, &op_data->op_fid4,
+ &cancels, LCK_EX,
+ MDS_INODELOCK_FULL);
+ if (exp_connect_cancelset(exp) && count) {
+ bufcount = 7;
+ size[REQ_REC_OFF + 5] = ldlm_request_bufsize(count, MDS_REINT);
+ }
req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
- MDS_REINT, 6, size, NULL);
+ MDS_REINT, bufcount, size, NULL);
+ if (req)
+ ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 5, 0);
+ else
+ ldlm_lock_list_put(&cancels, l_bl_ast, count);
+
if (req == NULL)
RETURN(-ENOMEM);
GOTO(err_close_lock, rc);
lprocfs_init_vars(mdc, &lvars);
lprocfs_obd_setup(obd, lvars.obd_vars);
+ ptlrpc_lprocfs_register_obd(obd);
rc = obd_llog_init(obd, NULL, obd, 0, NULL, NULL);
if (rc) {
struct obd_export *exp = req->rq_export;
struct ldlm_request *dlmreq =
lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*dlmreq));
- struct lustre_handle remote_hdl = dlmreq->lock_handle1;
+ struct lustre_handle remote_hdl = dlmreq->lock_handle[0];
struct list_head *iter;
if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
return;
dlmreq = req_capsule_client_get(&info->mti_pill, &RMF_DLM_REQ);
- remote_hdl = dlmreq->lock_handle1;
+ remote_hdl = dlmreq->lock_handle[0];
spin_lock(&exp->exp_ldlm_data.led_lock);
list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
RETURN(info->mti_epoch == NULL ? -EFAULT : 0);
}
+static inline int mdt_dlmreq_unpack(struct mdt_thread_info *info) {
+ struct req_capsule *pill = &info->mti_pill;
+
+ if (req_capsule_get_size(pill, &RMF_DLM_REQ, RCL_CLIENT)) {
+ info->mti_dlm_req = req_capsule_client_get(pill, &RMF_DLM_REQ);
+ if (info->mti_dlm_req == NULL)
+ RETURN(-EFAULT);
+ }
+
+ RETURN(0);
+}
+
static int mdt_setattr_unpack(struct mdt_thread_info *info)
{
struct md_attr *ma = &info->mti_attr;
/* Epoch may be absent */
mdt_epoch_unpack(info);
- if (req_capsule_field_present(pill, &RMF_EADATA, RCL_CLIENT)) {
+ ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_EADATA, RCL_CLIENT);
+ if (ma->ma_lmm_size) {
ma->ma_lmm = req_capsule_client_get(pill, &RMF_EADATA);
- ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_EADATA,
- RCL_CLIENT);
ma->ma_valid |= MA_LOV;
}
- if (req_capsule_field_present(pill, &RMF_LOGCOOKIES, RCL_CLIENT)) {
- ma->ma_cookie = req_capsule_client_get(pill,
- &RMF_LOGCOOKIES);
- ma->ma_cookie_size = req_capsule_get_size(pill,
- &RMF_LOGCOOKIES,
- RCL_CLIENT);
+ ma->ma_cookie_size = req_capsule_get_size(pill, &RMF_LOGCOOKIES,
+ RCL_CLIENT);
+ if (ma->ma_cookie_size) {
+ ma->ma_cookie = req_capsule_client_get(pill, &RMF_LOGCOOKIES);
ma->ma_valid |= MA_COOKIE;
}
- RETURN(0);
+ rc = mdt_dlmreq_unpack(info);
+ RETURN(rc);
}
int mdt_close_unpack(struct mdt_thread_info *info)
struct mdt_reint_record *rr = &info->mti_rr;
struct req_capsule *pill = &info->mti_pill;
struct md_op_spec *sp = &info->mti_spec;
+ int rc;
ENTRY;
rec = req_capsule_client_get(pill, &RMF_REC_CREATE);
&RMF_EADATA,
RCL_CLIENT);
sp->u.sp_ea.fid = rr->rr_fid1;
+ RETURN(0);
}
+ req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_RMT_ACL);
} else if (S_ISLNK(attr->la_mode)) {
const char *tgt = NULL;
req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_SYM);
- if (req_capsule_field_present(pill, &RMF_SYMTGT, RCL_CLIENT)) {
+ if (req_capsule_get_size(pill, &RMF_SYMTGT, RCL_CLIENT)) {
tgt = req_capsule_client_get(pill, &RMF_SYMTGT);
sp->u.sp_symname = tgt;
}
if (tgt == NULL)
RETURN(-EFAULT);
+ } else {
+ req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_RMT_ACL);
}
- RETURN(0);
+ rc = mdt_dlmreq_unpack(info);
+ RETURN(rc);
}
static int mdt_link_unpack(struct mdt_thread_info *info)
struct lu_attr *attr = &info->mti_attr.ma_attr;
struct mdt_reint_record *rr = &info->mti_rr;
struct req_capsule *pill = &info->mti_pill;
+ int rc;
ENTRY;
rec = req_capsule_client_get(pill, &RMF_REC_LINK);
info->mti_spec.sp_ck_split = !!(rec->lk_bias & MDS_CHECK_SPLIT);
info->mti_cross_ref = !!(rec->lk_bias & MDS_CROSS_REF);
- RETURN(0);
+ rc = mdt_dlmreq_unpack(info);
+ RETURN(rc);
}
static int mdt_unlink_unpack(struct mdt_thread_info *info)
struct lu_attr *attr = &info->mti_attr.ma_attr;
struct mdt_reint_record *rr = &info->mti_rr;
struct req_capsule *pill = &info->mti_pill;
+ int rc;
ENTRY;
rec = req_capsule_client_get(pill, &RMF_REC_UNLINK);
else
ma->ma_attr_flags &= ~MDS_VTX_BYPASS;
- RETURN(0);
+ rc = mdt_dlmreq_unpack(info);
+ RETURN(rc);
}
static int mdt_rename_unpack(struct mdt_thread_info *info)
struct lu_attr *attr = &info->mti_attr.ma_attr;
struct mdt_reint_record *rr = &info->mti_rr;
struct req_capsule *pill = &info->mti_pill;
+ int rc;
ENTRY;
rec = req_capsule_client_get(pill, &RMF_REC_RENAME);
else
ma->ma_attr_flags &= ~MDS_VTX_BYPASS;
- RETURN(0);
+ rc = mdt_dlmreq_unpack(info);
+ RETURN(rc);
}
static int mdt_open_unpack(struct mdt_thread_info *info)
struct req_capsule *pill = &info->mti_pill;
struct mdt_reint_record *rr = &info->mti_rr;
struct ptlrpc_request *req = mdt_info_req(info);
+ struct md_op_spec *sp = &info->mti_spec;
ENTRY;
rec = req_capsule_client_get(pill, &RMF_REC_CREATE);
rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1;
LASSERT(rr->rr_namelen > 0);
- if (req_capsule_field_present(pill, &RMF_EADATA, RCL_CLIENT)) {
- struct md_op_spec *sp = &info->mti_spec;
- sp->u.sp_ea.eadata = req_capsule_client_get(pill,
- &RMF_EADATA);
- sp->u.sp_ea.eadatalen = req_capsule_get_size(pill,
- &RMF_EADATA,
- RCL_CLIENT);
+ sp->u.sp_ea.eadatalen = req_capsule_get_size(pill, &RMF_EADATA,
+ RCL_CLIENT);
+ if (sp->u.sp_ea.eadatalen) {
+ sp->u.sp_ea.eadata = req_capsule_client_get(pill, &RMF_EADATA);
if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
sp->u.sp_ea.no_lov_create = 1;
}
DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
(unsigned int)ma->ma_attr.la_valid);
+ if (info->mti_dlm_req)
+ ldlm_request_cancel(req, info->mti_dlm_req, 0);
+
repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
if (IS_ERR(mo))
if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
GOTO(out, rc = err_serious(-ESTALE));
+ if (info->mti_dlm_req)
+ ldlm_request_cancel(mdt_info_req(info), info->mti_dlm_req, 0);
+
switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
case S_IFDIR:{
/* Cross-ref case. */
DEBUG_REQ(D_INODE, req, "unlink "DFID"/%s", PFID(rr->rr_fid1),
rr->rr_name);
+ if (info->mti_dlm_req)
+ ldlm_request_cancel(req, info->mti_dlm_req, 0);
+
if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
GOTO(out, rc = err_serious(-ENOENT));
if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
GOTO(out, rc = err_serious(-ENOENT));
+ if (info->mti_dlm_req)
+ ldlm_request_cancel(req, info->mti_dlm_req, 0);
+
if (info->mti_cross_ref) {
/* MDT holding name ask us to add ref. */
lhs = &info->mti_lh[MDT_LH_CHILD];
int rc;
ENTRY;
+ if (info->mti_dlm_req)
+ ldlm_request_cancel(mdt_info_req(info), info->mti_dlm_req, 0);
+
if (info->mti_cross_ref) {
rc = mdt_reint_rename_tgt(info);
RETURN(rc);
return rc;
}
+/* Find and cancel locally locks matched by @mode in the resource found by
+ * @objid. Found locks are added into @cancel list. Returns the amount of
+ * locks added to @cancels list. */
+static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
+ struct list_head *cancels, ldlm_mode_t mode,
+ int lock_flags)
+{
+ struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
+ struct ldlm_res_id res_id = { .name = { oa->o_id, 0, oa->o_gr, 0 } };
+ struct ldlm_resource *res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
+ int count;
+ ENTRY;
+
+ if (res == NULL)
+ RETURN(0);
+
+ count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
+ lock_flags, 0, NULL);
+ ldlm_resource_putref(res);
+ RETURN(count);
+}
+
/* Destroy requests can be async always on the client, and we don't even really
* care about the return code since the client cannot do anything at all about
* a destroy failure.
struct lov_stripe_md *ea, struct obd_trans_info *oti,
struct obd_export *md_export)
{
+ CFS_LIST_HEAD(cancels);
struct ptlrpc_request *req;
struct ost_body *body;
- int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+ int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), 0 };
+ int count, bufcount = 2;
ENTRY;
if (!oa) {
RETURN(-EINVAL);
}
+ count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
+ LDLM_FL_DISCARD_DATA);
+ if (exp_connect_cancelset(exp) && count) {
+ bufcount = 3;
+ size[REQ_REC_OFF + 1] = ldlm_request_bufsize(count,OST_DESTROY);
+ }
req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,
- OST_DESTROY, 2, size, NULL);
+ OST_DESTROY, bufcount, size, NULL);
+ if (req)
+ ldlm_cli_cancel_list(&cancels, count, req, REQ_REC_OFF + 1, 0);
+ else
+ ldlm_lock_list_put(&cancels, l_bl_ast, count);
+
if (!req)
RETURN(-ENOMEM);
if (intent) {
int size[3] = {
[MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
- [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request) };
+ [DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request),
+ [DLM_LOCKREQ_OFF + 1] = 0 };
- req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
- LDLM_ENQUEUE, 2, size, NULL);
+ req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);
if (req == NULL)
RETURN(-ENOMEM);
resp = &res_id;
}
- return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags,
- opaque);
+ return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
}
static int osc_join_lru(struct obd_export *exp,
if (body == NULL)
RETURN(-EFAULT);
+ if (lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1)) {
+ struct ldlm_request *dlm;
+ dlm = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, sizeof(*dlm),
+ lustre_swab_ldlm_request);
+ if (dlm == NULL)
+ RETURN (-EFAULT);
+ ldlm_request_cancel(req, dlm, 0);
+ }
+
rc = lustre_pack_reply(req, 2, size, NULL);
if (rc)
RETURN(rc);
&RMF_REC_CREATE,
&RMF_CAPA1,
&RMF_NAME,
- &RMF_EADATA
+ &RMF_EADATA,
+ &RMF_DLM_REQ
};
static const struct req_msg_field *mds_reint_create_sym_client[] = {
&RMF_REC_CREATE,
&RMF_CAPA1,
&RMF_NAME,
- &RMF_SYMTGT
+ &RMF_SYMTGT,
+ &RMF_DLM_REQ
};
static const struct req_msg_field *mds_reint_create_slave_client[] = {
&RMF_PTLRPC_BODY,
&RMF_REC_UNLINK,
&RMF_CAPA1,
- &RMF_NAME
+ &RMF_NAME,
+ &RMF_DLM_REQ
};
static const struct req_msg_field *mds_reint_link_client[] = {
&RMF_REC_LINK,
&RMF_CAPA1,
&RMF_CAPA2,
- &RMF_NAME
+ &RMF_NAME,
+ &RMF_DLM_REQ
};
static const struct req_msg_field *mds_reint_rename_client[] = {
&RMF_CAPA1,
&RMF_CAPA2,
&RMF_NAME,
- &RMF_SYMTGT
+ &RMF_SYMTGT,
+ &RMF_DLM_REQ
};
static const struct req_msg_field *mds_last_unlink_server[] = {
&RMF_CAPA1,
&RMF_MDT_EPOCH,
&RMF_EADATA,
- &RMF_LOGCOOKIES
+ &RMF_LOGCOOKIES,
+ &RMF_DLM_REQ
};
static const struct req_msg_field *mds_connect_client[] = {
field->rmf_name, offset, lustre_msg_bufcount(msg), fmt->rf_name,
lustre_msg_buflen(msg, offset), field->rmf_size,
rcl_names[loc]);
+
return value;
}
void lustre_swab_ldlm_request (struct ldlm_request *rq)
{
__swab32s (&rq->lock_flags);
- CLASSERT(offsetof(typeof(*rq), lock_padding) != 0);
lustre_swab_ldlm_lock_desc (&rq->lock_desc);
- /* lock_handle1 opaque */
- /* lock_handle2 opaque */
+ __swab32s (&rq->lock_count);
+ /* lock_handle[] opaque */
}
void lustre_swab_ldlm_reply (struct ldlm_reply *r)
CLASSERT(OBD_CONNECT_OSS_CAPA == 0x00200000ULL);
CLASSERT(OBD_CONNECT_MDS_MDS == 0x00400000ULL);
CLASSERT(OBD_CONNECT_SOM == 0x00800000ULL);
+ CLASSERT(OBD_CONNECT_CANCELSET == 0x01000000ULL);
/* Checks for struct obdo */
LASSERTF((int)sizeof(struct obdo) == 208, " found %lld\n",
(long long)(int)offsetof(struct ldlm_request, lock_flags));
LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_flags) == 4, " found %lld\n",
(long long)(int)sizeof(((struct ldlm_request *)0)->lock_flags));
- LASSERTF((int)offsetof(struct ldlm_request, lock_padding) == 4, " found %lld\n",
- (long long)(int)offsetof(struct ldlm_request, lock_padding));
- LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_padding) == 4, " found %lld\n",
- (long long)(int)sizeof(((struct ldlm_request *)0)->lock_padding));
+ LASSERTF((int)offsetof(struct ldlm_request, lock_count) == 4, " found %lld\n",
+ (long long)(int)offsetof(struct ldlm_request, lock_count));
+ LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_count) == 4, " found %lld\n",
+ (long long)(int)sizeof(((struct ldlm_request *)0)->lock_count));
LASSERTF((int)offsetof(struct ldlm_request, lock_desc) == 8, " found %lld\n",
(long long)(int)offsetof(struct ldlm_request, lock_desc));
LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_desc) == 80, " found %lld\n",
(long long)(int)sizeof(((struct ldlm_request *)0)->lock_desc));
- LASSERTF((int)offsetof(struct ldlm_request, lock_handle1) == 88, " found %lld\n",
- (long long)(int)offsetof(struct ldlm_request, lock_handle1));
- LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_handle1) == 8, " found %lld\n",
- (long long)(int)sizeof(((struct ldlm_request *)0)->lock_handle1));
- LASSERTF((int)offsetof(struct ldlm_request, lock_handle2) == 96, " found %lld\n",
- (long long)(int)offsetof(struct ldlm_request, lock_handle2));
- LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_handle2) == 8, " found %lld\n",
- (long long)(int)sizeof(((struct ldlm_request *)0)->lock_handle2));
+ LASSERTF((int)offsetof(struct ldlm_request, lock_handle) == 88, " found %lld\n",
+ (long long)(int)offsetof(struct ldlm_request, lock_handle));
+ LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_handle) == 16, " found %lld\n",
+ (long long)(int)sizeof(((struct ldlm_request *)0)->lock_handle));
/* Checks for struct ldlm_reply */
LASSERTF((int)sizeof(struct ldlm_reply) == 112, " found %lld\n",
(long long)(int)offsetof(struct ldlm_reply, lock_flags));
LASSERTF((int)sizeof(((struct ldlm_reply *)0)->lock_flags) == 4, " found %lld\n",
(long long)(int)sizeof(((struct ldlm_reply *)0)->lock_flags));
- LASSERTF((int)offsetof(struct ldlm_request, lock_padding) == 4, " found %lld\n",
- (long long)(int)offsetof(struct ldlm_request, lock_padding));
- LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_padding) == 4, " found %lld\n",
- (long long)(int)sizeof(((struct ldlm_request *)0)->lock_padding));
- LASSERTF((int)offsetof(struct ldlm_request, lock_desc) == 8, " found %lld\n",
- (long long)(int)offsetof(struct ldlm_request, lock_desc));
- LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_desc) == 80, " found %lld\n",
- (long long)(int)sizeof(((struct ldlm_request *)0)->lock_desc));
+ LASSERTF((int)offsetof(struct ldlm_reply, lock_padding) == 4, " found %lld\n",
+ (long long)(int)offsetof(struct ldlm_reply, lock_padding));
+ LASSERTF((int)sizeof(((struct ldlm_reply *)0)->lock_padding) == 4, " found %lld\n",
+ (long long)(int)sizeof(((struct ldlm_reply *)0)->lock_padding));
+ LASSERTF((int)offsetof(struct ldlm_reply, lock_desc) == 8, " found %lld\n",
+ (long long)(int)offsetof(struct ldlm_reply, lock_desc));
+ LASSERTF((int)sizeof(((struct ldlm_reply *)0)->lock_desc) == 80, " found %lld\n",
+ (long long)(int)sizeof(((struct ldlm_reply *)0)->lock_desc));
LASSERTF((int)offsetof(struct ldlm_reply, lock_handle) == 88, " found %lld\n",
(long long)(int)offsetof(struct ldlm_reply, lock_handle));
LASSERTF((int)sizeof(((struct ldlm_reply *)0)->lock_handle) == 8, " found %lld\n",
## this test is very time-consuming, don't run it by default
#run_test 4a " FIDS/ nlink overflow test ============================="
+test_5a() {
+ mount_client $MOUNT2
+ # create a cross-ref file
+ mkdir -p $MOUNT/$tdir/d1
+ mkdir -p $MOUNT2/$tdir/d2
+ dd if=/dev/zero of=$MOUNT/$tdir/d1/f1 count=1
+ mv $MOUNT2/$tdir/d1/f1 $MOUNT2/$tdir/d2/
+ # XXX: a check the file is a cross-ref one is needed.
+ cancel_lru_locks mdc
+ cancel_lru_locks osc
+ dd if=$MOUNT2/$tdir/d2/f1 of=/dev/null
+ stat $MOUNT2/$tdir/d2 $MOUNT2/$tdir/d2/f1 > /dev/null
+ can1=`awk '/ldlm_cancel/ {print $2}' /proc/fs/lustre/ldlm/services/ldlm_canceld/stats`
+ blk1=`awk '/ldlm_bl_callback/ {print $2}' /proc/fs/lustre/ldlm/services/ldlm_cbd/stats`
+ unlink $MOUNT2/$tdir/d2/f1
+ can2=`awk '/ldlm_cancel/ {print $2}' /proc/fs/lustre/ldlm/services/ldlm_canceld/stats`
+ blk2=`awk '/ldlm_bl_callback/ {print $2}' /proc/fs/lustre/ldlm/services/ldlm_cbd/stats`
+ umount $MOUNT2
+ [ $can1 -eq $can2 ] && error "It does not look like a cross-ref file."
+ [ $[$can1+1] -eq $can2 ] || error $[$[$can2-$can1]] "cancel RPC occured."
+ [ $blk1 -eq $blk2 ] || error $[$[$blk2-$blk1]] "blocking RPC occured."
+}
+run_test 5a "Early Lock Cancel: cross-ref unlink"
+
+test_5b() {
+ mount_client $MOUNT2
+ # create a cross-ref file
+ mkdir -p $MOUNT/$tdir/d1
+ mkdir -p $MOUNT2/$tdir/d2
+ dd if=/dev/zero of=$MOUNT/$tdir/d1/f1 count=1
+ cancel_lru_locks mdc
+ cancel_lru_locks osc
+ dd if=$MOUNT2/$tdir/d1/f1 of=/dev/null
+ stat $MOUNT2/$tdir/d1/f1 $MOUNT2/$tdir/d2 > /dev/null
+ can1=`awk '/ldlm_cancel/ {print $2}' /proc/fs/lustre/ldlm/services/ldlm_canceld/stats`
+ blk1=`awk '/ldlm_bl_callback/ {print $2}' /proc/fs/lustre/ldlm/services/ldlm_cbd/stats`
+ ln $MOUNT2/$tdir/d1/f1 $MOUNT2/$tdir/d2/f2
+ can2=`awk '/ldlm_cancel/ {print $2}' /proc/fs/lustre/ldlm/services/ldlm_canceld/stats`
+ blk2=`awk '/ldlm_bl_callback/ {print $2}' /proc/fs/lustre/ldlm/services/ldlm_cbd/stats`
+ umount $MOUNT2
+ [ $can1 -eq $can2 ] && error "It does not look like a cross-ref file."
+ [ $[$can1+1] -eq $can2 ] || error $[$[$can2-$can1]] "cancel RPC occured."
+ [ $blk1 -eq $blk2 ] || error $[$[$blk2-$blk1]] "blocking RPC occured."
+}
+run_test 5b "Early Lock Cancel: cross-ref link"
+
TMPDIR=$OLDTMPDIR
TMP=$OLDTMP
HOME=$OLDHOME
}
run_test 119b "Sparse directIO read must return actual read amount"
+test_119a() {
+ mkdir $DIR/$tdir
+ cancel_lru_locks mdc
+ stat $DIR/$tdir > /dev/null
+ can1=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats`
+ blk1=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats`
+ mkdir $DIR/$tdir/d1
+ can2=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats`
+ blk2=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats`
+ [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured."
+ [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
+}
+run_test 119a "Early Lock Cancel: mkdir test"
+
+test_119b() {
+ mkdir $DIR/$tdir
+ cancel_lru_locks mdc
+ stat $DIR/$tdir > /dev/null
+ can1=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats`
+ blk1=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats`
+ touch $DIR/$tdir/f1
+ blk2=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats`
+ can2=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats`
+ [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured."
+ [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
+}
+run_test 119b "Early Lock Cancel: create test"
+
+test_119c() {
+ mkdir -p $DIR/$tdir/d1 $DIR/$tdir/d2
+ touch $DIR/$tdir/d1/f1
+ cancel_lru_locks mdc
+ stat $DIR/$tdir/d1 $DIR/$tdir/d2 $DIR/$tdir/d1/f1 > /dev/null
+ can1=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats`
+ blk1=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats`
+ ln $DIR/$tdir/d1/f1 $DIR/$tdir/d2/f2
+ can2=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats`
+ blk2=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats`
+ [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured."
+ [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
+}
+run_test 119c "Early Lock Cancel: link test"
+
+test_119d() {
+ touch $DIR/$tdir
+ cancel_lru_locks mdc
+ stat $DIR/$tdir > /dev/null
+ can1=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats`
+ blk1=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats`
+ chmod a+x $DIR/$tdir
+ can2=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats`
+ blk2=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats`
+ [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured."
+ [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
+}
+run_test 119d "Early Lock Cancel: setattr test"
+
+test_119e() {
+ mkdir $DIR/$tdir
+ dd if=/dev/zero of=$DIR/$tdir/f1 count=1
+ cancel_lru_locks mdc
+ cancel_lru_locks osc
+ dd if=$DIR/$tdir/f1 of=/dev/null
+ stat $DIR/$tdir $DIR/$tdir/f1 > /dev/null
+ can1=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats`
+ blk1=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats`
+ unlink $DIR/$tdir/f1
+ can2=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats`
+ blk2=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats`
+ [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured."
+ [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
+}
+run_test 119e "Early Lock Cancel: unlink test"
+
+test_119f() {
+ mkdir -p $DIR/$tdir/d1 $DIR/$tdir/d2
+ dd if=/dev/zero of=$DIR/$tdir/d1/f1 count=1
+ dd if=/dev/zero of=$DIR/$tdir/d2/f2 count=1
+ cancel_lru_locks mdc
+ cancel_lru_locks osc
+ dd if=$DIR/$tdir/d1/f1 of=/dev/null
+ dd if=$DIR/$tdir/d2/f2 of=/dev/null
+ stat $DIR/$tdir/d1 $DIR/$tdir/d2 $DIR/$tdir/d1/f1 $DIR/$tdir/d2/f2 > /dev/null
+ can1=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats`
+ blk1=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats`
+ mv $DIR/$tdir/d1/f1 $DIR/$tdir/d2/f2
+ can2=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats`
+ blk2=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats`
+ [ $can1 -eq $can2 ] || error $((can2-can1)) "cancel RPC occured."
+ [ $blk1 -eq $blk2 ] || error $((blk2-blk1)) "blocking RPC occured."
+}
+run_test 119f "Early Lock Cancel: rename test"
+
+test_119g() {
+ count=10000
+ echo create $count files
+ mkdir $DIR/$tdir
+ cancel_lru_locks mdc
+ cancel_lru_locks osc
+ t0=`date +%s`
+
+ can0=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats`
+ blk0=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats`
+ createmany -o $DIR/$tdir/f $count
+ sync
+ can1=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats`
+ blk1=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats`
+ t1=`date +%s`
+ echo total: $((can1-can0)) cancels, $((blk1-blk0)) blockings
+ echo rm $count files
+ rm -r $DIR/$tdir
+ sync
+ can2=`awk '/ldlm_cancel/ {print $2}' $LPROC/ldlm/services/ldlm_canceld/stats`
+ blk2=`awk '/ldlm_bl_callback/ {print $2}' $LPROC/ldlm/services/ldlm_cbd/stats`
+ t2=`date +%s`
+ echo total: $count removes in $((t2-t1))
+ echo total: $((can2-can1)) cancels, $((blk2-blk1)) blockings
+ sleep 2
+ # wait for commitment of removal
+}
+run_test 119g "Early Lock Cancel: performance test"
+
TMPDIR=$OLDTMPDIR
TMP=$OLDTMP
HOME=$OLDHOME
CHECK_CDEFINE(OBD_CONNECT_OSS_CAPA);
CHECK_CDEFINE(OBD_CONNECT_MDS_MDS);
CHECK_CDEFINE(OBD_CONNECT_SOM);
+ CHECK_CDEFINE(OBD_CONNECT_CANCELSET);
}
static void
BLANK_LINE();
CHECK_STRUCT(ldlm_request);
CHECK_MEMBER(ldlm_request, lock_flags);
- CHECK_MEMBER(ldlm_request, lock_padding);
+ CHECK_MEMBER(ldlm_request, lock_count);
CHECK_MEMBER(ldlm_request, lock_desc);
- CHECK_MEMBER(ldlm_request, lock_handle1);
- CHECK_MEMBER(ldlm_request, lock_handle2);
+ CHECK_MEMBER(ldlm_request, lock_handle);
}
static void
BLANK_LINE();
CHECK_STRUCT(ldlm_reply);
CHECK_MEMBER(ldlm_reply, lock_flags);
- CHECK_MEMBER(ldlm_request, lock_padding);
- CHECK_MEMBER(ldlm_request, lock_desc);
+ CHECK_MEMBER(ldlm_reply, lock_padding);
+ CHECK_MEMBER(ldlm_reply, lock_desc);
CHECK_MEMBER(ldlm_reply, lock_handle);
CHECK_MEMBER(ldlm_reply, lock_policy_res1);
CHECK_MEMBER(ldlm_reply, lock_policy_res2);
CLASSERT(OBD_CONNECT_OSS_CAPA == 0x00200000ULL);
CLASSERT(OBD_CONNECT_MDS_MDS == 0x00400000ULL);
CLASSERT(OBD_CONNECT_SOM == 0x00800000ULL);
+ CLASSERT(OBD_CONNECT_CANCELSET == 0x01000000ULL);
/* Checks for struct obdo */
LASSERTF((int)sizeof(struct obdo) == 208, " found %lld\n",
(long long)(int)offsetof(struct ldlm_request, lock_flags));
LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_flags) == 4, " found %lld\n",
(long long)(int)sizeof(((struct ldlm_request *)0)->lock_flags));
- LASSERTF((int)offsetof(struct ldlm_request, lock_padding) == 4, " found %lld\n",
- (long long)(int)offsetof(struct ldlm_request, lock_padding));
- LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_padding) == 4, " found %lld\n",
- (long long)(int)sizeof(((struct ldlm_request *)0)->lock_padding));
+ LASSERTF((int)offsetof(struct ldlm_request, lock_count) == 4, " found %lld\n",
+ (long long)(int)offsetof(struct ldlm_request, lock_count));
+ LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_count) == 4, " found %lld\n",
+ (long long)(int)sizeof(((struct ldlm_request *)0)->lock_count));
LASSERTF((int)offsetof(struct ldlm_request, lock_desc) == 8, " found %lld\n",
(long long)(int)offsetof(struct ldlm_request, lock_desc));
LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_desc) == 80, " found %lld\n",
(long long)(int)sizeof(((struct ldlm_request *)0)->lock_desc));
- LASSERTF((int)offsetof(struct ldlm_request, lock_handle1) == 88, " found %lld\n",
- (long long)(int)offsetof(struct ldlm_request, lock_handle1));
- LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_handle1) == 8, " found %lld\n",
- (long long)(int)sizeof(((struct ldlm_request *)0)->lock_handle1));
- LASSERTF((int)offsetof(struct ldlm_request, lock_handle2) == 96, " found %lld\n",
- (long long)(int)offsetof(struct ldlm_request, lock_handle2));
- LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_handle2) == 8, " found %lld\n",
- (long long)(int)sizeof(((struct ldlm_request *)0)->lock_handle2));
+ LASSERTF((int)offsetof(struct ldlm_request, lock_handle) == 88, " found %lld\n",
+ (long long)(int)offsetof(struct ldlm_request, lock_handle));
+ LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_handle) == 16, " found %lld\n",
+ (long long)(int)sizeof(((struct ldlm_request *)0)->lock_handle));
/* Checks for struct ldlm_reply */
LASSERTF((int)sizeof(struct ldlm_reply) == 112, " found %lld\n",
(long long)(int)offsetof(struct ldlm_reply, lock_flags));
LASSERTF((int)sizeof(((struct ldlm_reply *)0)->lock_flags) == 4, " found %lld\n",
(long long)(int)sizeof(((struct ldlm_reply *)0)->lock_flags));
- LASSERTF((int)offsetof(struct ldlm_request, lock_padding) == 4, " found %lld\n",
- (long long)(int)offsetof(struct ldlm_request, lock_padding));
- LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_padding) == 4, " found %lld\n",
- (long long)(int)sizeof(((struct ldlm_request *)0)->lock_padding));
- LASSERTF((int)offsetof(struct ldlm_request, lock_desc) == 8, " found %lld\n",
- (long long)(int)offsetof(struct ldlm_request, lock_desc));
- LASSERTF((int)sizeof(((struct ldlm_request *)0)->lock_desc) == 80, " found %lld\n",
- (long long)(int)sizeof(((struct ldlm_request *)0)->lock_desc));
+ LASSERTF((int)offsetof(struct ldlm_reply, lock_padding) == 4, " found %lld\n",
+ (long long)(int)offsetof(struct ldlm_reply, lock_padding));
+ LASSERTF((int)sizeof(((struct ldlm_reply *)0)->lock_padding) == 4, " found %lld\n",
+ (long long)(int)sizeof(((struct ldlm_reply *)0)->lock_padding));
+ LASSERTF((int)offsetof(struct ldlm_reply, lock_desc) == 8, " found %lld\n",
+ (long long)(int)offsetof(struct ldlm_reply, lock_desc));
+ LASSERTF((int)sizeof(((struct ldlm_reply *)0)->lock_desc) == 80, " found %lld\n",
+ (long long)(int)sizeof(((struct ldlm_reply *)0)->lock_desc));
LASSERTF((int)offsetof(struct ldlm_reply, lock_handle) == 88, " found %lld\n",
(long long)(int)offsetof(struct ldlm_reply, lock_handle));
LASSERTF((int)sizeof(((struct ldlm_reply *)0)->lock_handle) == 8, " found %lld\n",