typedef int (*ldlm_glimpse_callback)(struct ldlm_lock *lock, void *data);
typedef unsigned long (*ldlm_weigh_callback)(struct ldlm_lock *lock);
+struct ldlm_glimpse_work {
+ struct ldlm_lock *gl_lock; /* lock to glimpse */
+ cfs_list_t gl_list; /* linkage to other gl work structs */
+ __u32 gl_flags;/* see LDLM_GL_WORK_* below */
+};
+
+/* the ldlm_glimpse_work is allocated on the stack and should not be freed */
+#define LDLM_GL_WORK_NOFREE 0x1
+
/* Interval node data for each LDLM_EXTENT lock */
struct ldlm_interval {
struct interval_node li_node; /* node for tree mgmt */
void *data, int flag);
int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data);
int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data);
+int ldlm_glimpse_locks(struct ldlm_resource *res, cfs_list_t *gl_work_list);
int ldlm_handle_enqueue(struct ptlrpc_request *req, ldlm_completion_callback,
ldlm_blocking_callback, ldlm_glimpse_callback);
int ldlm_handle_enqueue0(struct ldlm_namespace *ns, struct ptlrpc_request *req,
ENTRY;
LASSERT(lock != NULL);
- if (rc != 0) {
- rc = ldlm_handle_ast_error(lock, req, rc,
- arg->type == LDLM_BL_CALLBACK
- ? "blocking" : "completion");
- if (rc == -ERESTART)
- cfs_atomic_inc(&arg->restart);
- }
+
+ switch (arg->type) {
+ case LDLM_GL_CALLBACK:
+ /* Update the LVB from disk if the AST failed
+ * (this is a legal race)
+ *
+ * - Glimpse callback of local lock just returns
+ * -ELDLM_NO_LOCK_DATA.
+ * - Glimpse callback of remote lock might return
+ * -ELDLM_NO_LOCK_DATA when inode is cleared. LU-274
+ */
+ if (rc == -ELDLM_NO_LOCK_DATA) {
+ LDLM_DEBUG(lock, "lost race - client has a lock but no "
+ "inode");
+ ldlm_res_lvbo_update(lock->l_resource, NULL, 1);
+ } else if (rc != 0) {
+ rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
+ } else {
+ rc = ldlm_res_lvbo_update(lock->l_resource, req, 1);
+ }
+ break;
+ case LDLM_BL_CALLBACK:
+ if (rc != 0)
+ rc = ldlm_handle_ast_error(lock, req, rc, "blocking");
+ break;
+ case LDLM_CP_CALLBACK:
+ if (rc != 0)
+ rc = ldlm_handle_ast_error(lock, req, rc, "completion");
+ break;
+ default:
+ LDLM_ERROR(lock, "invalid opcode for lock callback %d",
+ arg->type);
+ LBUG();
+ }
+
+ /* release extra reference taken in ldlm_ast_fini() */
LDLM_LOCK_RELEASE(lock);
+ if (rc == -ERESTART)
+ cfs_atomic_inc(&arg->restart);
+
RETURN(0);
}
-static inline int ldlm_bl_and_cp_ast_tail(struct ptlrpc_request *req,
- struct ldlm_cb_set_arg *arg,
- struct ldlm_lock *lock,
- int instant_cancel)
+static inline int ldlm_ast_fini(struct ptlrpc_request *req,
+ struct ldlm_cb_set_arg *arg,
+ struct ldlm_lock *lock,
+ int instant_cancel)
{
int rc = 0;
ENTRY;
lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
LDLM_BL_CALLBACK - LDLM_FIRST_OPC);
- rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel);
+ rc = ldlm_ast_fini(req, arg, lock, instant_cancel);
RETURN(rc);
}
lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
LDLM_CP_CALLBACK - LDLM_FIRST_OPC);
- rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel);
+ rc = ldlm_ast_fini(req, arg, lock, instant_cancel);
RETURN(rc);
}
int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
{
- struct ldlm_resource *res = lock->l_resource;
- struct ldlm_request *body;
- struct ptlrpc_request *req;
- int rc;
+ struct ldlm_cb_set_arg *arg = data;
+ struct ldlm_request *body;
+ struct ptlrpc_request *req;
+ struct ldlm_cb_async_args *ca;
+ int rc;
ENTRY;
LASSERT(lock != NULL);
body->lock_handle[0] = lock->l_remote_handle;
ldlm_lock2desc(lock, &body->lock_desc);
+ CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
+ ca = ptlrpc_req_async_args(req);
+ ca->ca_set_arg = arg;
+ ca->ca_lock = lock;
+
/* server namespace, doesn't need lock */
req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
lock->l_resource->lr_lvb_len);
- res = lock->l_resource;
ptlrpc_request_set_replen(req);
-
req->rq_send_state = LUSTRE_IMP_FULL;
/* ptlrpc_request_alloc_pack already set timeout */
if (AT_OFF)
req->rq_timeout = ldlm_get_rq_timeout();
+ req->rq_interpret_reply = ldlm_cb_interpret;
+
if (lock->l_export && lock->l_export->exp_nid_stats &&
lock->l_export->exp_nid_stats->nid_ldlm_stats)
lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
LDLM_GL_CALLBACK - LDLM_FIRST_OPC);
- rc = ptlrpc_queue_wait(req);
- /* Update the LVB from disk if the AST failed (this is a legal race)
- *
- * - Glimpse callback of local lock just return -ELDLM_NO_LOCK_DATA.
- * - Glimpse callback of remote lock might return -ELDLM_NO_LOCK_DATA
- * when inode is cleared. LU-274
- */
- if (rc == -ELDLM_NO_LOCK_DATA) {
- LDLM_DEBUG(lock, "lost race - client has a lock but no inode");
- ldlm_res_lvbo_update(res, NULL, 1);
- } else if (rc != 0) {
- rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
- } else {
- rc = ldlm_res_lvbo_update(res, req, 1);
- }
+ rc = ldlm_ast_fini(req, arg, lock, 0);
- ptlrpc_req_finished(req);
- if (rc == -ERESTART)
- ldlm_reprocess_all(res);
+ RETURN(rc);
+}
- RETURN(rc);
+int ldlm_glimpse_locks(struct ldlm_resource *res, cfs_list_t *gl_work_list)
+{
+ int rc;
+ ENTRY;
+
+ rc = ldlm_run_ast_work(ldlm_res_to_ns(res), gl_work_list,
+ LDLM_WORK_GL_AST);
+ if (rc == -ERESTART)
+ ldlm_reprocess_all(res);
+
+ RETURN(rc);
}
static void ldlm_svc_get_eopc(const struct ldlm_request *dlm_req,
EXPORT_SYMBOL(ldlm_server_blocking_ast);
EXPORT_SYMBOL(ldlm_server_completion_ast);
EXPORT_SYMBOL(ldlm_server_glimpse_ast);
+EXPORT_SYMBOL(ldlm_glimpse_locks);
EXPORT_SYMBOL(ldlm_handle_enqueue);
EXPORT_SYMBOL(ldlm_handle_enqueue0);
EXPORT_SYMBOL(ldlm_handle_cancel);
__u32 repsize[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
[DLM_LOCKREPLY_OFF] = sizeof(*rep),
[DLM_REPLY_REC_OFF] = sizeof(*reply_lvb) };
+ struct ldlm_glimpse_work gl_work;
+ CFS_LIST_HEAD(gl_list);
ENTRY;
policy = ldlm_get_processing_policy(res);
}
LASSERTF(l->l_glimpse_ast != NULL, "l == %p", l);
- rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */
+
+ /* Populate the gl_work structure.
+ * Grab additional reference on the lock which will be released in
+ * ldlm_work_gl_ast_lock() */
+ gl_work.gl_lock = LDLM_LOCK_GET(l);
+ /* The glimpse callback is sent to one single extent lock. As a result,
+ * the gl_work list is just composed of one element */
+ cfs_list_add_tail(&gl_work.gl_list, &gl_list);
+ /* the ldlm_glimpse_work structure is allocated on the stack */
+ gl_work.gl_flags = LDLM_GL_WORK_NOFREE;
+
+ rc = ldlm_glimpse_locks(res, &gl_list); /* this will update the LVB */
+
+ if (!cfs_list_empty(&gl_list))
+ LDLM_LOCK_RELEASE(l);
lock_res(res);
*reply_lvb = *res_lvb;
[DLM_LOCKREPLY_OFF] = sizeof(*rep),
[DLM_REPLY_REC_OFF] = sizeof(*reply_lvb)
};
-
+ struct ldlm_glimpse_work gl_work;
+ CFS_LIST_HEAD(gl_list);
ENTRY;
policy = ldlm_get_processing_policy(res);
goto out;
}
- LASSERTF(l->l_glimpse_ast != NULL, "l == %p", l);
- rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */
+ /* Populate the gl_work structure.
+ * Grab additional reference on the lock which will be released in
+ * ldlm_work_gl_ast_lock() */
+ gl_work.gl_lock = LDLM_LOCK_GET(l);
+ /* The glimpse callback is sent to one single extent lock. As a result,
+ * the gl_work list is just composed of one element */
+ cfs_list_add_tail(&gl_work.gl_list, &gl_list);
+ /* the ldlm_glimpse_work structure is allocated on the stack */
+ gl_work.gl_flags = LDLM_GL_WORK_NOFREE;
+
+ rc = ldlm_glimpse_locks(res, &gl_list); /* this will update the LVB */
+
+ if (!cfs_list_empty(&gl_list))
+ LDLM_LOCK_RELEASE(l);
lock_res(res);
*reply_lvb = *res_lvb;