Whamcloud - gitweb
LU-1842 ldlm: support for sending GL ASTs to multiple locks
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
index 740eae7..6a6eabc 100644 (file)
@@ -699,22 +699,54 @@ static int ldlm_cb_interpret(const struct lu_env *env,
         ENTRY;
 
         LASSERT(lock != NULL);
         ENTRY;
 
         LASSERT(lock != NULL);
-        if (rc != 0) {
-                rc = ldlm_handle_ast_error(lock, req, rc,
-                                           arg->type == LDLM_BL_CALLBACK
-                                           ? "blocking" : "completion");
-                if (rc == -ERESTART)
-                        cfs_atomic_inc(&arg->restart);
-        }
+
+       switch (arg->type) {
+       case LDLM_GL_CALLBACK:
+               /* Update the LVB from disk if the AST failed
+                * (this is a legal race)
+                *
+                * - Glimpse callback of local lock just returns
+                *   -ELDLM_NO_LOCK_DATA.
+                * - Glimpse callback of remote lock might return
+                *   -ELDLM_NO_LOCK_DATA when inode is cleared. LU-274
+                */
+               if (rc == -ELDLM_NO_LOCK_DATA) {
+                       LDLM_DEBUG(lock, "lost race - client has a lock but no "
+                                  "inode");
+                       ldlm_res_lvbo_update(lock->l_resource, NULL, 1);
+               } else if (rc != 0) {
+                       rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
+               } else {
+                       rc = ldlm_res_lvbo_update(lock->l_resource, req, 1);
+               }
+               break;
+       case LDLM_BL_CALLBACK:
+               if (rc != 0)
+                       rc = ldlm_handle_ast_error(lock, req, rc, "blocking");
+               break;
+       case LDLM_CP_CALLBACK:
+               if (rc != 0)
+                       rc = ldlm_handle_ast_error(lock, req, rc, "completion");
+               break;
+       default:
+               LDLM_ERROR(lock, "invalid opcode for lock callback %d",
+                          arg->type);
+               LBUG();
+       }
+
+       /* release extra reference taken in ldlm_ast_fini() */
         LDLM_LOCK_RELEASE(lock);
 
         LDLM_LOCK_RELEASE(lock);
 
+       if (rc == -ERESTART)
+               cfs_atomic_inc(&arg->restart);
+
         RETURN(0);
 }
 
         RETURN(0);
 }
 
-static inline int ldlm_bl_and_cp_ast_tail(struct ptlrpc_request *req,
-                                          struct ldlm_cb_set_arg *arg,
-                                          struct ldlm_lock *lock,
-                                          int instant_cancel)
+static inline int ldlm_ast_fini(struct ptlrpc_request *req,
+                               struct ldlm_cb_set_arg *arg,
+                               struct ldlm_lock *lock,
+                               int instant_cancel)
 {
        int rc = 0;
        ENTRY;
 {
        int rc = 0;
        ENTRY;
@@ -853,7 +885,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
                                      LDLM_BL_CALLBACK - LDLM_FIRST_OPC);
 
                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
                                      LDLM_BL_CALLBACK - LDLM_FIRST_OPC);
 
-        rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel);
+       rc = ldlm_ast_fini(req, arg, lock, instant_cancel);
 
         RETURN(rc);
 }
 
         RETURN(rc);
 }
@@ -968,17 +1000,18 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
                                      LDLM_CP_CALLBACK - LDLM_FIRST_OPC);
 
                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
                                      LDLM_CP_CALLBACK - LDLM_FIRST_OPC);
 
-        rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel);
+       rc = ldlm_ast_fini(req, arg, lock, instant_cancel);
 
         RETURN(rc);
 }
 
 int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
 {
 
         RETURN(rc);
 }
 
 int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
 {
-        struct ldlm_resource  *res = lock->l_resource;
-        struct ldlm_request   *body;
-        struct ptlrpc_request *req;
-        int                    rc;
+       struct ldlm_cb_set_arg          *arg = data;
+       struct ldlm_request             *body;
+       struct ptlrpc_request           *req;
+       struct ldlm_cb_async_args       *ca;
+       int                              rc;
         ENTRY;
 
         LASSERT(lock != NULL);
         ENTRY;
 
         LASSERT(lock != NULL);
@@ -994,44 +1027,44 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
         body->lock_handle[0] = lock->l_remote_handle;
         ldlm_lock2desc(lock, &body->lock_desc);
 
         body->lock_handle[0] = lock->l_remote_handle;
         ldlm_lock2desc(lock, &body->lock_desc);
 
+       CLASSERT(sizeof(*ca) <= sizeof(req->rq_async_args));
+       ca = ptlrpc_req_async_args(req);
+       ca->ca_set_arg = arg;
+       ca->ca_lock = lock;
+
         /* server namespace, doesn't need lock */
         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
                              lock->l_resource->lr_lvb_len);
         /* server namespace, doesn't need lock */
         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
                              lock->l_resource->lr_lvb_len);
-        res = lock->l_resource;
         ptlrpc_request_set_replen(req);
 
         ptlrpc_request_set_replen(req);
 
-
         req->rq_send_state = LUSTRE_IMP_FULL;
         /* ptlrpc_request_alloc_pack already set timeout */
         if (AT_OFF)
                 req->rq_timeout = ldlm_get_rq_timeout();
 
         req->rq_send_state = LUSTRE_IMP_FULL;
         /* ptlrpc_request_alloc_pack already set timeout */
         if (AT_OFF)
                 req->rq_timeout = ldlm_get_rq_timeout();
 
+       req->rq_interpret_reply = ldlm_cb_interpret;
+
         if (lock->l_export && lock->l_export->exp_nid_stats &&
             lock->l_export->exp_nid_stats->nid_ldlm_stats)
                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
                                      LDLM_GL_CALLBACK - LDLM_FIRST_OPC);
 
         if (lock->l_export && lock->l_export->exp_nid_stats &&
             lock->l_export->exp_nid_stats->nid_ldlm_stats)
                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
                                      LDLM_GL_CALLBACK - LDLM_FIRST_OPC);
 
-        rc = ptlrpc_queue_wait(req);
-        /* Update the LVB from disk if the AST failed (this is a legal race)
-         *
-         * - Glimpse callback of local lock just return -ELDLM_NO_LOCK_DATA.
-         * - Glimpse callback of remote lock might return -ELDLM_NO_LOCK_DATA
-         *   when inode is cleared. LU-274
-         */
-        if (rc == -ELDLM_NO_LOCK_DATA) {
-                LDLM_DEBUG(lock, "lost race - client has a lock but no inode");
-                ldlm_res_lvbo_update(res, NULL, 1);
-        } else if (rc != 0) {
-                rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
-        } else {
-                rc = ldlm_res_lvbo_update(res, req, 1);
-        }
+       rc = ldlm_ast_fini(req, arg, lock, 0);
 
 
-        ptlrpc_req_finished(req);
-        if (rc == -ERESTART)
-                ldlm_reprocess_all(res);
+       RETURN(rc);
+}
 
 
-        RETURN(rc);
+int ldlm_glimpse_locks(struct ldlm_resource *res, cfs_list_t *gl_work_list)
+{
+       int     rc;
+       ENTRY;
+
+       rc = ldlm_run_ast_work(ldlm_res_to_ns(res), gl_work_list,
+                              LDLM_WORK_GL_AST);
+       if (rc == -ERESTART)
+               ldlm_reprocess_all(res);
+
+       RETURN(rc);
 }
 
 static void ldlm_svc_get_eopc(const struct ldlm_request *dlm_req,
 }
 
 static void ldlm_svc_get_eopc(const struct ldlm_request *dlm_req,
@@ -2922,6 +2955,7 @@ EXPORT_SYMBOL(ldlm_cli_cancel_list);
 EXPORT_SYMBOL(ldlm_server_blocking_ast);
 EXPORT_SYMBOL(ldlm_server_completion_ast);
 EXPORT_SYMBOL(ldlm_server_glimpse_ast);
 EXPORT_SYMBOL(ldlm_server_blocking_ast);
 EXPORT_SYMBOL(ldlm_server_completion_ast);
 EXPORT_SYMBOL(ldlm_server_glimpse_ast);
+EXPORT_SYMBOL(ldlm_glimpse_locks);
 EXPORT_SYMBOL(ldlm_handle_enqueue);
 EXPORT_SYMBOL(ldlm_handle_enqueue0);
 EXPORT_SYMBOL(ldlm_handle_cancel);
 EXPORT_SYMBOL(ldlm_handle_enqueue);
 EXPORT_SYMBOL(ldlm_handle_enqueue0);
 EXPORT_SYMBOL(ldlm_handle_cancel);