Whamcloud - gitweb
b=13872
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
index 25b742b..8aff2f6 100644 (file)
@@ -245,7 +245,7 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,
         ENTRY;
 
         LASSERT(!(*flags & LDLM_FL_REPLAY));
-        if (unlikely(ns->ns_client)) {
+        if (unlikely(ns_is_client(ns))) {
                 CERROR("Trying to enqueue local lock in a shadow namespace\n");
                 LBUG();
         }
@@ -473,18 +473,28 @@ cleanup:
 /* PAGE_SIZE-512 is to allow TCP/IP and LNET headers to fit into
  * a single page on the send/receive side. XXX: 512 should be changed
  * to more adequate value. */
-#define ldlm_req_handles_avail(exp, size, bufcount, off)                \
-({                                                                      \
-        int _avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512);      \
-        int _s = size[DLM_LOCKREQ_OFF];                                 \
-        size[DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request);            \
-        _avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, \
-                                bufcount, size);                        \
-        _avail /= sizeof(struct lustre_handle);                         \
-        _avail += LDLM_LOCKREQ_HANDLES - off;                           \
-        size[DLM_LOCKREQ_OFF] = _s;                                     \
-        _avail;                                                         \
-})
+static inline int ldlm_req_handles_avail(struct obd_export *exp,
+                                         int *size, int bufcount, int off)
+{
+        int avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512);
+        int old_size = size[DLM_LOCKREQ_OFF];
+
+        size[DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request);
+        avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
+                                 bufcount, size);
+        avail /= sizeof(struct lustre_handle);
+        avail += LDLM_LOCKREQ_HANDLES - off;
+        size[DLM_LOCKREQ_OFF] = old_size;
+
+        return avail;
+}
+
+static inline int ldlm_cancel_handles_avail(struct obd_export *exp)
+{
+        int size[2] = { sizeof(struct ptlrpc_body),
+                        sizeof(struct ldlm_request) };
+        return ldlm_req_handles_avail(exp, size, 2, 0);
+}
 
 /* Cancel lru locks and pack them into the enqueue request. Pack there the given
  * @count locks in @cancels. */
@@ -511,8 +521,9 @@ struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,
                  * EARLY_CANCEL. Otherwise we have to send extra CANCEL
                  * rpc right on enqueue, what will make it slower, vs. 
                  * asynchronous rpc in blocking thread. */
-                count += ldlm_cancel_lru_local(ns, cancels, 1, avail - count,
-                                               LDLM_CANCEL_AGED);
+                count += ldlm_cancel_lru_local(ns, cancels,
+                                               exp_connect_lru_resize(exp) ? 0 : 1,
+                                               avail - count, LDLM_CANCEL_AGED);
                 size[DLM_LOCKREQ_OFF] =
                         ldlm_request_bufsize(count, LDLM_ENQUEUE);
         }
@@ -675,7 +686,7 @@ static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,
         struct ldlm_resource *res;
         int rc;
         ENTRY;
-        if (lock->l_resource->lr_namespace->ns_client) {
+        if (ns_is_client(lock->l_resource->lr_namespace)) {
                 CERROR("Trying to cancel local lock\n");
                 LBUG();
         }
@@ -801,7 +812,7 @@ static int ldlm_cli_cancel_local(struct ldlm_lock *lock)
                 }
                 ldlm_lock_cancel(lock);
         } else {
-                if (lock->l_resource->lr_namespace->ns_client) {
+                if (ns_is_client(lock->l_resource->lr_namespace)) {
                         LDLM_ERROR(lock, "Trying to cancel local lock");
                         LBUG();
                 }
@@ -878,9 +889,9 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
         while (1) {
                 imp = class_exp2cliimp(exp);
                 if (imp == NULL || imp->imp_invalid) {
-                        CDEBUG(D_HA, "skipping cancel on invalid import %p\n",
-                               imp);
-                        break;
+                        CDEBUG(D_DLMTRACE,
+                               "skipping cancel on invalid import %p\n", imp);
+                        RETURN(count);
                 }
 
                 req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_CANCEL, 2,
@@ -912,6 +923,7 @@ int ldlm_cli_cancel_req(struct obd_export *exp, struct list_head *cancels,
                                "out of sync -- not fatal\n",
                                libcfs_nid2str(req->rq_import->
                                               imp_connection->c_peer.nid));
+                        rc = 0;
                 } else if (rc == -ETIMEDOUT && /* check there was no reconnect*/
                            req->rq_import_generation == imp->imp_generation) {
                         ptlrpc_req_finished(req);
@@ -931,10 +943,58 @@ out:
         return sent ? sent : rc;
 }
 
+static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)
+{
+        LASSERT(imp != NULL);
+        return &imp->imp_obd->obd_namespace->ns_pool;
+}
+
+int ldlm_cli_update_pool(struct ptlrpc_request *req)
+{
+        struct ldlm_pool *pl;
+        ENTRY;
+    
+        if (!imp_connect_lru_resize(req->rq_import))
+                RETURN(0);
+
+        if (lustre_msg_get_slv(req->rq_repmsg) == 0 ||
+            lustre_msg_get_limit(req->rq_repmsg) == 0)
+                RETURN(0);
+
+        pl = ldlm_imp2pl(req->rq_import);
+        
+        spin_lock(&pl->pl_lock);
+#ifdef __KERNEL__
+        {
+                __u64 old_slv, fast_slv_change;
+
+                old_slv = ldlm_pool_get_slv(pl);
+                fast_slv_change = old_slv * LDLM_POOLS_FAST_SLV_CHANGE;
+                do_div(fast_slv_change, 100);
+#endif
+                pl->pl_update_time = cfs_time_current();
+                ldlm_pool_set_slv(pl, lustre_msg_get_slv(req->rq_repmsg));
+                ldlm_pool_set_limit(pl, lustre_msg_get_limit(req->rq_repmsg));
+#ifdef __KERNEL__
+                /* Wake up pools thread only if SLV has changed more than 
+                 * 5% since last update. In this case we want to react asap. 
+                 * Otherwise it is no sense to wake up pools as they are 
+                 * re-calculated every 1s anyways. */
+                if (old_slv > ldlm_pool_get_slv(pl) && 
+                    old_slv - ldlm_pool_get_slv(pl) > fast_slv_change)
+                        ldlm_pools_wakeup();
+        }
+#endif
+        spin_unlock(&pl->pl_lock);
+
+        RETURN(0);
+}
+EXPORT_SYMBOL(ldlm_cli_update_pool);
+
 int ldlm_cli_cancel(struct lustre_handle *lockh)
 {
         struct ldlm_lock *lock;
-        CFS_LIST_HEAD(head);
+        CFS_LIST_HEAD(cancels);
         int rc = 0;
         ENTRY;
 
@@ -946,15 +1006,22 @@ int ldlm_cli_cancel(struct lustre_handle *lockh)
         }
         
         rc = ldlm_cli_cancel_local(lock);
-        if (rc < 0 || rc == LDLM_FL_LOCAL_ONLY)
-                GOTO(out, rc);
-
-        list_add(&lock->l_bl_ast, &head);
-        rc = ldlm_cli_cancel_req(lock->l_conn_export, &head, 1, 0);
-        EXIT;
-out:
-        LDLM_LOCK_PUT(lock);
-        return rc < 0 ? rc : 0;
+        list_add(&lock->l_bl_ast, &cancels);
+
+        if (rc == LDLM_FL_BL_AST) {
+                rc = ldlm_cli_cancel_req(lock->l_conn_export, &cancels, 1, 0);
+        } else if (rc == LDLM_FL_CANCELING) {
+                int avail = ldlm_cancel_handles_avail(lock->l_conn_export);
+                int count = 1;
+                LASSERT(avail > 0);
+                count += ldlm_cancel_lru_local(lock->l_resource->lr_namespace,
+                                               &cancels, 0, avail - 1,
+                                               LDLM_CANCEL_AGED);
+                ldlm_cli_cancel_list(&cancels, count, NULL, 0, 0);
+        }
+        if (rc != LDLM_FL_CANCELING)
+                LDLM_LOCK_PUT(lock);
+        RETURN(rc < 0 ? rc : 0);
 }
 
 /* - Free space in lru for @count new locks,
@@ -973,13 +1040,22 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
                           int count, int max, int flags)
 {
         cfs_time_t cur = cfs_time_current();
+        int rc, added = 0, left, unused;
         struct ldlm_lock *lock, *next;
-        int rc, added = 0, left;
+        __u64 slv, lvf, lv;
         ENTRY;
 
         spin_lock(&ns->ns_unused_lock);
-        count += ns->ns_nr_unused - ns->ns_max_unused;
+        unused = ns->ns_nr_unused;
+        
+        if (!ns_connect_lru_resize(ns))
+                count += unused - ns->ns_max_unused;
+
         while (!list_empty(&ns->ns_unused_list)) {
+                struct ldlm_pool *pl = &ns->ns_pool;
+
+                LASSERT(unused >= 0);
+
                 if (max && added >= max)
                         break;
 
@@ -993,11 +1069,38 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
                 if (&lock->l_lru == &ns->ns_unused_list)
                         break;
 
-                if ((added >= count) && 
-                    (!(flags & LDLM_CANCEL_AGED) ||
-                     cfs_time_before_64(cur, (__u64)ns->ns_max_age +
-                                        lock->l_last_used)))
-                        break;
+                if (ns_connect_lru_resize(ns)) {
+                        cfs_time_t la;
+                        
+                        /* Take into account SLV only if cpount == 0. */
+                        if (count == 0) {
+                                /* Calculate lv for every lock. */
+                                spin_lock(&pl->pl_lock);
+                                slv = ldlm_pool_get_slv(pl);
+                                lvf = atomic_read(&pl->pl_lock_volume_factor);
+                                spin_unlock(&pl->pl_lock);
+
+                                la = cfs_duration_sec(cfs_time_sub(cur, 
+                                                      lock->l_last_used));
+                                if (la == 0)
+                                        la = 1;
+                                
+                                /* Stop when slv is not yet come from server 
+                                 * or lv is smaller than it is. */
+                                lv = lvf * la * unused;
+                                if (slv == 1 || lv < slv)
+                                        break;
+                        } else {
+                                if (added >= count)
+                                        break;
+                        }
+                } else {
+                        if ((added >= count) && 
+                            (!(flags & LDLM_CANCEL_AGED) ||
+                             cfs_time_before_64(cur, ns->ns_max_age +
+                                                lock->l_last_used)))
+                                break;
+                }
 
                 LDLM_LOCK_GET(lock); /* dropped by bl thread */
                 spin_unlock(&ns->ns_unused_lock);
@@ -1042,6 +1145,7 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
                 unlock_res_and_lock(lock);
                 spin_lock(&ns->ns_unused_lock);
                 added++;
+                unused--;
         }
         spin_unlock(&ns->ns_unused_lock);
 
@@ -1076,7 +1180,7 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
  * in a thread and this function will return after the thread has been
  * asked to call the callback.  when called with LDLM_SYNC the blocking
  * callback will be performed in this function. */
-int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync)
+int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync)
 {
         CFS_LIST_HEAD(cancels);
         int count, rc;
@@ -1085,24 +1189,17 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync)
 #ifndef __KERNEL__
         sync = LDLM_SYNC; /* force to be sync in user space */
 #endif
-        count = ldlm_cancel_lru_local(ns, &cancels, 0, 0, 0);
+        count = ldlm_cancel_lru_local(ns, &cancels, nr, 0, 0);
         if (sync == LDLM_ASYNC) {
-                struct ldlm_lock *lock, *next;
-                list_for_each_entry_safe(lock, next, &cancels, l_bl_ast) {
-                        /* Remove from the list to allow blocking thread to
-                         * re-use l_bl_ast. */
-                        list_del_init(&lock->l_bl_ast);
-                        rc = ldlm_bl_to_thread(ns, NULL, lock,
-                                               LDLM_FL_CANCELING);
-                        if (rc)
-                                list_add_tail(&lock->l_bl_ast, &next->l_bl_ast);
-                }
+                rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count);
+                if (rc == 0)
+                        RETURN(count);
         }
 
-        /* If some locks are left in the list in ASYNC mode, or
+        /* If an error occured in ASYNC mode, or
          * this is SYNC mode, cancel the list. */
-        ldlm_cli_cancel_list(&cancels, count, NULL, DLM_LOCKREQ_OFF, 0);
-        RETURN(0);
+        ldlm_cli_cancel_list(&cancels, count, NULL, 0, 0);
+        RETURN(count);
 }
 
 /* Find and cancel locally unused locks found on resource, matched to the
@@ -1153,7 +1250,7 @@ int ldlm_cancel_resource_local(struct ldlm_resource *res,
 
                 /* See CBPENDING comment in ldlm_cancel_lru */
                 lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING |
-                        lock_flags;
+                                 lock_flags;
 
                 LASSERT(list_empty(&lock->l_bl_ast));
                 list_add(&lock->l_bl_ast, cancels);
@@ -1225,7 +1322,8 @@ int ldlm_cli_cancel_list(struct list_head *cancels, int count,
                                 ldlm_cancel_pack(req, off, cancels, count);
                         else
                                 res = ldlm_cli_cancel_req(lock->l_conn_export,
-                                                          cancels, count, flags);
+                                                          cancels, count,
+                                                          flags);
                 } else {
                         res = ldlm_cli_cancel_req(lock->l_conn_export,
                                                   cancels, 1, flags);
@@ -1264,8 +1362,7 @@ int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns,
 
         count = ldlm_cancel_resource_local(res, &cancels, policy, mode,
                                            0, flags, opaque);
-        rc = ldlm_cli_cancel_list(&cancels, count, NULL,
-                                  DLM_LOCKREQ_OFF, flags);
+        rc = ldlm_cli_cancel_list(&cancels, count, NULL, 0, flags);
         if (rc != ELDLM_OK)
                 CERROR("ldlm_cli_cancel_unused_resource: %d\n", rc);
 
@@ -1344,7 +1441,7 @@ int ldlm_cli_join_lru(struct ldlm_namespace *ns,
         int count = 0;
         ENTRY;
 
-        LASSERT(ns->ns_client == LDLM_NAMESPACE_CLIENT);
+        LASSERT(ns_is_client(ns));
 
         res = ldlm_resource_get(ns, NULL, res_id, LDLM_EXTENT, 0);
         if (res == NULL)
@@ -1360,12 +1457,7 @@ int ldlm_cli_join_lru(struct ldlm_namespace *ns,
                     !lock->l_readers && !lock->l_writers &&
                     !(lock->l_flags & LDLM_FL_LOCAL) &&
                     !(lock->l_flags & LDLM_FL_CBPENDING)) {
-                        lock->l_last_used = cfs_time_current();
-                        spin_lock(&ns->ns_unused_lock);
-                        LASSERT(ns->ns_nr_unused >= 0);
-                        list_add_tail(&lock->l_lru, &ns->ns_unused_list);
-                        ns->ns_nr_unused++;
-                        spin_unlock(&ns->ns_unused_lock);
+                        ldlm_lock_add_to_lru(lock);
                         lock->l_flags &= ~LDLM_FL_NO_LRU;
                         LDLM_DEBUG(lock, "join lock to lru");
                         count++;