Whamcloud - gitweb
LU-1842 ldlm: support for sending GL ASTs to multiple locks
authorJohann Lombardi <johann@whamcloud.com>
Thu, 6 Sep 2012 12:09:05 +0000 (14:09 +0200)
committerOleg Drokin <green@whamcloud.com>
Fri, 14 Sep 2012 05:24:28 +0000 (01:24 -0400)
Traditionally, a glimpse callback is sent to one single lock which
is the highest PW extent lock.
The new quota code uses glimpse callbacks in a different manner since
it can issue multiple glimpse ASTs to different slaves (as done for
blocking callbacks for instance) to notify those latters of the new
qunit value.

This patch modifies the glimpse callback path to use
ldlm_run_ast_work() to issue multiple glimpse ASTs in one shot (as
done currently for other lock callbacks).

Signed-off-by: Johann Lombardi <johann@whamcloud.com>
Change-Id: I972630fcf3158dd1d361392913f8086f1ebaa39f
Reviewed-on: http://review.whamcloud.com/3887
Reviewed-by: Niu Yawei <niu@whamcloud.com>
Tested-by: Hudson
Reviewed-by: Fan Yong <yong.fan@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lustre_dlm.h
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/obdfilter/filter.c
lustre/ofd/ofd_dlm.c

index f70034a..480bced 100644 (file)
@@ -568,6 +568,15 @@ typedef int (*ldlm_completion_callback)(struct ldlm_lock *lock, int flags,
 typedef int (*ldlm_glimpse_callback)(struct ldlm_lock *lock, void *data);
 typedef unsigned long (*ldlm_weigh_callback)(struct ldlm_lock *lock);
 
 typedef int (*ldlm_glimpse_callback)(struct ldlm_lock *lock, void *data);
 typedef unsigned long (*ldlm_weigh_callback)(struct ldlm_lock *lock);
 
+struct ldlm_glimpse_work {
+       struct ldlm_lock        *gl_lock; /* lock to glimpse */
+       cfs_list_t               gl_list; /* linkage to other gl work structs */
+       __u32                    gl_flags;/* see LDLM_GL_WORK_* below */
+};
+
+/* the ldlm_glimpse_work is allocated on the stack and should not be freed */
+#define LDLM_GL_WORK_NOFREE 0x1
+
 /* Interval node data for each LDLM_EXTENT lock */
 struct ldlm_interval {
         struct interval_node li_node;   /* node for tree mgmt */
 /* Interval node data for each LDLM_EXTENT lock */
 struct ldlm_interval {
         struct interval_node li_node;   /* node for tree mgmt */
@@ -994,6 +1003,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *, struct ldlm_lock_desc *,
                              void *data, int flag);
 int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data);
 int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data);
                              void *data, int flag);
 int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data);
 int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data);
+int ldlm_glimpse_locks(struct ldlm_resource *res, cfs_list_t *gl_work_list);
 int ldlm_handle_enqueue(struct ptlrpc_request *req, ldlm_completion_callback,
                         ldlm_blocking_callback, ldlm_glimpse_callback);
 int ldlm_handle_enqueue0(struct ldlm_namespace *ns, struct ptlrpc_request *req,
 int ldlm_handle_enqueue(struct ptlrpc_request *req, ldlm_completion_callback,
                         ldlm_blocking_callback, ldlm_glimpse_callback);
 int ldlm_handle_enqueue0(struct ldlm_namespace *ns, struct ptlrpc_request *req,
index 237152d..65c1331 100644 (file)
@@ -95,16 +95,17 @@ void ldlm_namespace_free_post(struct ldlm_namespace *ns);
 /* ldlm_lock.c */
 
 struct ldlm_cb_set_arg {
 /* ldlm_lock.c */
 
 struct ldlm_cb_set_arg {
-       struct ptlrpc_request_set *set;
-       int                        type; /* LDLM_{CP,BL}_CALLBACK */
-       cfs_atomic_t               restart;
-       cfs_list_t                *list;
+       struct ptlrpc_request_set       *set;
+       int                              type; /* LDLM_{CP,BL,GL}_CALLBACK */
+       cfs_atomic_t                     restart;
+       cfs_list_t                      *list;
 };
 
 typedef enum {
 };
 
 typedef enum {
-        LDLM_WORK_BL_AST,
-        LDLM_WORK_CP_AST,
-        LDLM_WORK_REVOKE_AST
+       LDLM_WORK_BL_AST,
+       LDLM_WORK_CP_AST,
+       LDLM_WORK_REVOKE_AST,
+       LDLM_WORK_GL_AST
 } ldlm_desc_ast_t;
 
 void ldlm_grant_lock(struct ldlm_lock *lock, cfs_list_t *work_list);
 } ldlm_desc_ast_t;
 
 void ldlm_grant_lock(struct ldlm_lock *lock, cfs_list_t *work_list);
@@ -127,6 +128,7 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, cfs_list_t *queue,
 #endif
 int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list,
                       ldlm_desc_ast_t ast_type);
 #endif
 int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list,
                       ldlm_desc_ast_t ast_type);
+int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq);
 int ldlm_lock_remove_from_lru(struct ldlm_lock *lock);
 int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock);
 void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock);
 int ldlm_lock_remove_from_lru(struct ldlm_lock *lock);
 int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock);
 void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock);
index bb17c8a..93ccbee 100644 (file)
@@ -1618,6 +1618,33 @@ ldlm_work_revoke_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
+int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
+{
+       struct ldlm_cb_set_arg          *arg = opaq;
+       struct ldlm_glimpse_work        *gl_work;
+       struct ldlm_lock                *lock;
+       int                              rc = 0;
+       ENTRY;
+
+       if (cfs_list_empty(arg->list))
+               RETURN(-ENOENT);
+
+       gl_work = cfs_list_entry(arg->list->next, struct ldlm_glimpse_work,
+                                gl_list);
+       cfs_list_del_init(&gl_work->gl_list);
+
+       lock = gl_work->gl_lock;
+       if (lock->l_glimpse_ast(lock, (void*)arg) == 0)
+               rc = 1;
+
+       LDLM_LOCK_RELEASE(lock);
+
+       if ((gl_work->gl_flags & LDLM_GL_WORK_NOFREE) == 0)
+               OBD_FREE_PTR(gl_work);
+
+       RETURN(rc);
+}
+
 int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list,
                       ldlm_desc_ast_t ast_type)
 {
 int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list,
                       ldlm_desc_ast_t ast_type)
 {
@@ -1648,6 +1675,10 @@ int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list,
                        arg->type = LDLM_BL_CALLBACK;
                        work_ast_lock = ldlm_work_revoke_ast_lock;
                        break;
                        arg->type = LDLM_BL_CALLBACK;
                        work_ast_lock = ldlm_work_revoke_ast_lock;
                        break;
+               case LDLM_WORK_GL_AST:
+                       arg->type = LDLM_GL_CALLBACK;
+                       work_ast_lock = ldlm_work_gl_ast_lock;
+                       break;
                default:
                        LBUG();
        }
                default:
                        LBUG();
        }
index 740eae7..6a6eabc 100644 (file)
@@ -699,22 +699,54 @@ static int ldlm_cb_interpret(const struct lu_env *env,
         ENTRY;
 
         LASSERT(lock != NULL);
         ENTRY;
 
         LASSERT(lock != NULL);
-        if (rc != 0) {
-                rc = ldlm_handle_ast_error(lock, req, rc,
-                                           arg->type == LDLM_BL_CALLBACK
-                                           ? "blocking" : "completion");
-                if (rc == -ERESTART)
-                        cfs_atomic_inc(&arg->restart);
-        }
+
+       switch (arg->type) {
+       case LDLM_GL_CALLBACK:
+               /* Update the LVB from disk if the AST failed
+                * (this is a legal race)
+                *
+                * - Glimpse callback of local lock just returns
+                *   -ELDLM_NO_LOCK_DATA.
+                * - Glimpse callback of remote lock might return
+                *   -ELDLM_NO_LOCK_DATA when inode is cleared. LU-274
+                */
+               if (rc == -ELDLM_NO_LOCK_DATA) {
+                       LDLM_DEBUG(lock, "lost race - client has a lock but no "
+                                  "inode");
+                       ldlm_res_lvbo_update(lock->l_resource, NULL, 1);
+               } else if (rc != 0) {
+                       rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
+               } else {
+                       rc = ldlm_res_lvbo_update(lock->l_resource, req, 1);
+               }
+               break;
+       case LDLM_BL_CALLBACK:
+               if (rc != 0)
+                       rc = ldlm_handle_ast_error(lock, req, rc, "blocking");
+               break;
+       case LDLM_CP_CALLBACK:
+               if (rc != 0)
+                       rc = ldlm_handle_ast_error(lock, req, rc, "completion");
+               break;
+       default:
+               LDLM_ERROR(lock, "invalid opcode for lock callback %d",
+                          arg->type);
+               LBUG();
+       }
+
+       /* release extra reference taken in ldlm_ast_fini() */
         LDLM_LOCK_RELEASE(lock);
 
         LDLM_LOCK_RELEASE(lock);
 
+       if (rc == -ERESTART)
+               cfs_atomic_inc(&arg->restart);
+
         RETURN(0);
 }
 
         RETURN(0);
 }
 
-static inline int ldlm_bl_and_cp_ast_tail(struct ptlrpc_request *req,
-                                          struct ldlm_cb_set_arg *arg,
-                                          struct ldlm_lock *lock,
-                                          int instant_cancel)
+static inline int ldlm_ast_fini(struct ptlrpc_request *req,
+                               struct ldlm_cb_set_arg *arg,
+                               struct ldlm_lock *lock,
+                               int instant_cancel)
 {
        int rc = 0;
        ENTRY;
 {
        int rc = 0;
        ENTRY;
@@ -853,7 +885,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
                                      LDLM_BL_CALLBACK - LDLM_FIRST_OPC);
 
                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
                                      LDLM_BL_CALLBACK - LDLM_FIRST_OPC);
 
-        rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel);
+       rc = ldlm_ast_fini(req, arg, lock, instant_cancel);
 
         RETURN(rc);
 }
 
         RETURN(rc);
 }
@@ -968,17 +1000,18 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
                                      LDLM_CP_CALLBACK - LDLM_FIRST_OPC);
 
                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
                                      LDLM_CP_CALLBACK - LDLM_FIRST_OPC);
 
-        rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel);
+       rc = ldlm_ast_fini(req, arg, lock, instant_cancel);
 
         RETURN(rc);
 }
 
 int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
 {
 
         RETURN(rc);
 }
 
 int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
 {
-        struct ldlm_resource  *res = lock->l_resource;
-        struct ldlm_request   *body;
-        struct ptlrpc_request *req;
-        int                    rc;
+       struct ldlm_cb_set_arg          *arg = data;
+       struct ldlm_request             *body;
+       struct ptlrpc_request           *req;
+       struct ldlm_cb_async_args       *ca;
+       int                              rc;
         ENTRY;
 
         LASSERT(lock != NULL);
         ENTRY;
 
         LASSERT(lock != NULL);
@@ -994,44 +1027,44 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
         body->lock_handle[0] = lock->l_remote_handle;
         ldlm_lock2desc(lock, &body->lock_desc);
 
         body->lock_handle[0] = lock->l_remote_handle;
         ldlm_lock2desc(lock, &body->lock_desc);
 
+       CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
+       ca = ptlrpc_req_async_args(req);
+       ca->ca_set_arg = arg;
+       ca->ca_lock = lock;
+
         /* server namespace, doesn't need lock */
         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
                              lock->l_resource->lr_lvb_len);
         /* server namespace, doesn't need lock */
         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
                              lock->l_resource->lr_lvb_len);
-        res = lock->l_resource;
         ptlrpc_request_set_replen(req);
 
         ptlrpc_request_set_replen(req);
 
-
         req->rq_send_state = LUSTRE_IMP_FULL;
         /* ptlrpc_request_alloc_pack already set timeout */
         if (AT_OFF)
                 req->rq_timeout = ldlm_get_rq_timeout();
 
         req->rq_send_state = LUSTRE_IMP_FULL;
         /* ptlrpc_request_alloc_pack already set timeout */
         if (AT_OFF)
                 req->rq_timeout = ldlm_get_rq_timeout();
 
+       req->rq_interpret_reply = ldlm_cb_interpret;
+
         if (lock->l_export && lock->l_export->exp_nid_stats &&
             lock->l_export->exp_nid_stats->nid_ldlm_stats)
                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
                                      LDLM_GL_CALLBACK - LDLM_FIRST_OPC);
 
         if (lock->l_export && lock->l_export->exp_nid_stats &&
             lock->l_export->exp_nid_stats->nid_ldlm_stats)
                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
                                      LDLM_GL_CALLBACK - LDLM_FIRST_OPC);
 
-        rc = ptlrpc_queue_wait(req);
-        /* Update the LVB from disk if the AST failed (this is a legal race)
-         *
-         * - Glimpse callback of local lock just return -ELDLM_NO_LOCK_DATA.
-         * - Glimpse callback of remote lock might return -ELDLM_NO_LOCK_DATA
-         *   when inode is cleared. LU-274
-         */
-        if (rc == -ELDLM_NO_LOCK_DATA) {
-                LDLM_DEBUG(lock, "lost race - client has a lock but no inode");
-                ldlm_res_lvbo_update(res, NULL, 1);
-        } else if (rc != 0) {
-                rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
-        } else {
-                rc = ldlm_res_lvbo_update(res, req, 1);
-        }
+       rc = ldlm_ast_fini(req, arg, lock, 0);
 
 
-        ptlrpc_req_finished(req);
-        if (rc == -ERESTART)
-                ldlm_reprocess_all(res);
+       RETURN(rc);
+}
 
 
-        RETURN(rc);
+int ldlm_glimpse_locks(struct ldlm_resource *res, cfs_list_t *gl_work_list)
+{
+       int     rc;
+       ENTRY;
+
+       rc = ldlm_run_ast_work(ldlm_res_to_ns(res), gl_work_list,
+                              LDLM_WORK_GL_AST);
+       if (rc == -ERESTART)
+               ldlm_reprocess_all(res);
+
+       RETURN(rc);
 }
 
 static void ldlm_svc_get_eopc(const struct ldlm_request *dlm_req,
 }
 
 static void ldlm_svc_get_eopc(const struct ldlm_request *dlm_req,
@@ -2922,6 +2955,7 @@ EXPORT_SYMBOL(ldlm_cli_cancel_list);
 EXPORT_SYMBOL(ldlm_server_blocking_ast);
 EXPORT_SYMBOL(ldlm_server_completion_ast);
 EXPORT_SYMBOL(ldlm_server_glimpse_ast);
 EXPORT_SYMBOL(ldlm_server_blocking_ast);
 EXPORT_SYMBOL(ldlm_server_completion_ast);
 EXPORT_SYMBOL(ldlm_server_glimpse_ast);
+EXPORT_SYMBOL(ldlm_glimpse_locks);
 EXPORT_SYMBOL(ldlm_handle_enqueue);
 EXPORT_SYMBOL(ldlm_handle_enqueue0);
 EXPORT_SYMBOL(ldlm_handle_cancel);
 EXPORT_SYMBOL(ldlm_handle_enqueue);
 EXPORT_SYMBOL(ldlm_handle_enqueue0);
 EXPORT_SYMBOL(ldlm_handle_cancel);
index 23a992e..ef25f36 100644 (file)
@@ -1698,6 +1698,8 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
         __u32 repsize[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
                            [DLM_LOCKREPLY_OFF]   = sizeof(*rep),
                            [DLM_REPLY_REC_OFF]   = sizeof(*reply_lvb) };
         __u32 repsize[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
                            [DLM_LOCKREPLY_OFF]   = sizeof(*rep),
                            [DLM_REPLY_REC_OFF]   = sizeof(*reply_lvb) };
+       struct ldlm_glimpse_work        gl_work;
+       CFS_LIST_HEAD(gl_list);
         ENTRY;
 
         policy = ldlm_get_processing_policy(res);
         ENTRY;
 
         policy = ldlm_get_processing_policy(res);
@@ -1820,7 +1822,21 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
         }
 
         LASSERTF(l->l_glimpse_ast != NULL, "l == %p", l);
         }
 
         LASSERTF(l->l_glimpse_ast != NULL, "l == %p", l);
-        rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */
+
+       /* Populate the gl_work structure.
+        * Grab additional reference on the lock which will be released in
+        * ldlm_work_gl_ast_lock() */
+       gl_work.gl_lock = LDLM_LOCK_GET(l);
+       /* The glimpse callback is sent to one single extent lock. As a result,
+        * the gl_work list is just composed of one element */
+       cfs_list_add_tail(&gl_work.gl_list, &gl_list);
+       /* the ldlm_glimpse_work structure is allocated on the stack */
+       gl_work.gl_flags = LDLM_GL_WORK_NOFREE;
+
+       rc = ldlm_glimpse_locks(res, &gl_list); /* this will update the LVB */
+
+       if (!cfs_list_empty(&gl_list))
+               LDLM_LOCK_RELEASE(l);
 
         lock_res(res);
         *reply_lvb = *res_lvb;
 
         lock_res(res);
         *reply_lvb = *res_lvb;
index 34790c5..22effa0 100644 (file)
@@ -107,7 +107,8 @@ int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp,
                [DLM_LOCKREPLY_OFF]   = sizeof(*rep),
                [DLM_REPLY_REC_OFF]   = sizeof(*reply_lvb)
        };
                [DLM_LOCKREPLY_OFF]   = sizeof(*rep),
                [DLM_REPLY_REC_OFF]   = sizeof(*reply_lvb)
        };
-
+       struct ldlm_glimpse_work         gl_work;
+       CFS_LIST_HEAD(gl_list);
        ENTRY;
 
        policy = ldlm_get_processing_policy(res);
        ENTRY;
 
        policy = ldlm_get_processing_policy(res);
@@ -226,8 +227,20 @@ int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp,
                goto out;
        }
 
                goto out;
        }
 
-       LASSERTF(l->l_glimpse_ast != NULL, "l == %p", l);
-       rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */
+       /* Populate the gl_work structure.
+        * Grab additional reference on the lock which will be released in
+        * ldlm_work_gl_ast_lock() */
+       gl_work.gl_lock = LDLM_LOCK_GET(l);
+       /* The glimpse callback is sent to one single extent lock. As a result,
+        * the gl_work list is just composed of one element */
+       cfs_list_add_tail(&gl_work.gl_list, &gl_list);
+       /* the ldlm_glimpse_work structure is allocated on the stack */
+       gl_work.gl_flags = LDLM_GL_WORK_NOFREE;
+
+       rc = ldlm_glimpse_locks(res, &gl_list); /* this will update the LVB */
+
+       if (!cfs_list_empty(&gl_list))
+               LDLM_LOCK_RELEASE(l);
 
        lock_res(res);
        *reply_lvb = *res_lvb;
 
        lock_res(res);
        *reply_lvb = *res_lvb;