ENTRY;
LASSERT(!(*flags & LDLM_FL_REPLAY));
- if (unlikely(ns->ns_client)) {
+ if (unlikely(ns_is_client(ns))) {
CERROR("Trying to enqueue local lock in a shadow namespace\n");
LBUG();
}
/* PAGE_SIZE-512 is to allow TCP/IP and LNET headers to fit into
* a single page on the send/receive side. XXX: 512 should be changed
* to more adequate value. */
-#define ldlm_req_handles_avail(exp, size, bufcount, off) \
-({ \
- int _avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512); \
- int _s = size[DLM_LOCKREQ_OFF]; \
- size[DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request); \
- _avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, \
- bufcount, size); \
- _avail /= sizeof(struct lustre_handle); \
- _avail += LDLM_LOCKREQ_HANDLES - off; \
- size[DLM_LOCKREQ_OFF] = _s; \
- _avail; \
-})
+static inline int ldlm_req_handles_avail(struct obd_export *exp,
+ int *size, int bufcount, int off)
+{
+ int avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512);
+ int old_size = size[DLM_LOCKREQ_OFF];
+
+ size[DLM_LOCKREQ_OFF] = sizeof(struct ldlm_request);
+ avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,
+ bufcount, size);
+ avail /= sizeof(struct lustre_handle);
+ avail += LDLM_LOCKREQ_HANDLES - off;
+ size[DLM_LOCKREQ_OFF] = old_size;
+
+ return avail;
+}
+
+static inline int ldlm_cancel_handles_avail(struct obd_export *exp)
+{
+ int size[2] = { sizeof(struct ptlrpc_body),
+ sizeof(struct ldlm_request) };
+ return ldlm_req_handles_avail(exp, size, 2, 0);
+}
/* Cancel lru locks and pack them into the enqueue request. Pack there the given
* @count locks in @cancels. */
* EARLY_CANCEL. Otherwise we have to send extra CANCEL
* rpc right on enqueue, what will make it slower, vs.
* asynchronous rpc in blocking thread. */
- count += ldlm_cancel_lru_local(ns, cancels, 1, avail - count,
- LDLM_CANCEL_AGED);
+ count += ldlm_cancel_lru_local(ns, cancels,
+ exp_connect_lru_resize(exp) ? 0 : 1,
+ avail - count, LDLM_CANCEL_AGED);
size[DLM_LOCKREQ_OFF] =
ldlm_request_bufsize(count, LDLM_ENQUEUE);
}
struct ldlm_resource *res;
int rc;
ENTRY;
- if (lock->l_resource->lr_namespace->ns_client) {
+ if (ns_is_client(lock->l_resource->lr_namespace)) {
CERROR("Trying to cancel local lock\n");
LBUG();
}
}
ldlm_lock_cancel(lock);
} else {
- if (lock->l_resource->lr_namespace->ns_client) {
+ if (ns_is_client(lock->l_resource->lr_namespace)) {
LDLM_ERROR(lock, "Trying to cancel local lock");
LBUG();
}
while (1) {
imp = class_exp2cliimp(exp);
if (imp == NULL || imp->imp_invalid) {
- CDEBUG(D_HA, "skipping cancel on invalid import %p\n",
- imp);
- break;
+ CDEBUG(D_DLMTRACE,
+ "skipping cancel on invalid import %p\n", imp);
+ RETURN(count);
}
req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_CANCEL, 2,
"out of sync -- not fatal\n",
libcfs_nid2str(req->rq_import->
imp_connection->c_peer.nid));
+ rc = 0;
} else if (rc == -ETIMEDOUT && /* check there was no reconnect*/
req->rq_import_generation == imp->imp_generation) {
ptlrpc_req_finished(req);
return sent ? sent : rc;
}
+static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)
+{
+ LASSERT(imp != NULL);
+ return &imp->imp_obd->obd_namespace->ns_pool;
+}
+
+int ldlm_cli_update_pool(struct ptlrpc_request *req)
+{
+ struct ldlm_pool *pl;
+ ENTRY;
+
+ if (!imp_connect_lru_resize(req->rq_import))
+ RETURN(0);
+
+ if (lustre_msg_get_slv(req->rq_repmsg) == 0 ||
+ lustre_msg_get_limit(req->rq_repmsg) == 0)
+ RETURN(0);
+
+ pl = ldlm_imp2pl(req->rq_import);
+
+ spin_lock(&pl->pl_lock);
+#ifdef __KERNEL__
+ {
+ __u64 old_slv, fast_slv_change;
+
+ old_slv = ldlm_pool_get_slv(pl);
+ fast_slv_change = old_slv * LDLM_POOLS_FAST_SLV_CHANGE;
+ do_div(fast_slv_change, 100);
+#endif
+ pl->pl_update_time = cfs_time_current();
+ ldlm_pool_set_slv(pl, lustre_msg_get_slv(req->rq_repmsg));
+ ldlm_pool_set_limit(pl, lustre_msg_get_limit(req->rq_repmsg));
+#ifdef __KERNEL__
+ /* Wake up pools thread only if SLV has changed more than
+ * 5% since last update. In this case we want to react asap.
+ * Otherwise it is no sense to wake up pools as they are
+ * re-calculated every 1s anyways. */
+ if (old_slv > ldlm_pool_get_slv(pl) &&
+ old_slv - ldlm_pool_get_slv(pl) > fast_slv_change)
+ ldlm_pools_wakeup();
+ }
+#endif
+ spin_unlock(&pl->pl_lock);
+
+ RETURN(0);
+}
+EXPORT_SYMBOL(ldlm_cli_update_pool);
+
int ldlm_cli_cancel(struct lustre_handle *lockh)
{
struct ldlm_lock *lock;
- CFS_LIST_HEAD(head);
+ CFS_LIST_HEAD(cancels);
int rc = 0;
ENTRY;
}
rc = ldlm_cli_cancel_local(lock);
- if (rc < 0 || rc == LDLM_FL_LOCAL_ONLY)
- GOTO(out, rc);
-
- list_add(&lock->l_bl_ast, &head);
- rc = ldlm_cli_cancel_req(lock->l_conn_export, &head, 1, 0);
- EXIT;
-out:
- LDLM_LOCK_PUT(lock);
- return rc < 0 ? rc : 0;
+ list_add(&lock->l_bl_ast, &cancels);
+
+ if (rc == LDLM_FL_BL_AST) {
+ rc = ldlm_cli_cancel_req(lock->l_conn_export, &cancels, 1, 0);
+ } else if (rc == LDLM_FL_CANCELING) {
+ int avail = ldlm_cancel_handles_avail(lock->l_conn_export);
+ int count = 1;
+ LASSERT(avail > 0);
+ count += ldlm_cancel_lru_local(lock->l_resource->lr_namespace,
+ &cancels, 0, avail - 1,
+ LDLM_CANCEL_AGED);
+ ldlm_cli_cancel_list(&cancels, count, NULL, 0, 0);
+ }
+ if (rc != LDLM_FL_CANCELING)
+ LDLM_LOCK_PUT(lock);
+ RETURN(rc < 0 ? rc : 0);
}
/* - Free space in lru for @count new locks,
int count, int max, int flags)
{
cfs_time_t cur = cfs_time_current();
+ int rc, added = 0, left, unused;
struct ldlm_lock *lock, *next;
- int rc, added = 0, left;
+ __u64 slv, lvf, lv;
ENTRY;
spin_lock(&ns->ns_unused_lock);
- count += ns->ns_nr_unused - ns->ns_max_unused;
+ unused = ns->ns_nr_unused;
+
+ if (!ns_connect_lru_resize(ns))
+ count += unused - ns->ns_max_unused;
+
while (!list_empty(&ns->ns_unused_list)) {
+ struct ldlm_pool *pl = &ns->ns_pool;
+
+ LASSERT(unused >= 0);
+
if (max && added >= max)
break;
if (&lock->l_lru == &ns->ns_unused_list)
break;
- if ((added >= count) &&
- (!(flags & LDLM_CANCEL_AGED) ||
- cfs_time_before_64(cur, (__u64)ns->ns_max_age +
- lock->l_last_used)))
- break;
+ if (ns_connect_lru_resize(ns)) {
+ cfs_time_t la;
+
+ /* Take into account SLV only if cpount == 0. */
+ if (count == 0) {
+ /* Calculate lv for every lock. */
+ spin_lock(&pl->pl_lock);
+ slv = ldlm_pool_get_slv(pl);
+ lvf = atomic_read(&pl->pl_lock_volume_factor);
+ spin_unlock(&pl->pl_lock);
+
+ la = cfs_duration_sec(cfs_time_sub(cur,
+ lock->l_last_used));
+ if (la == 0)
+ la = 1;
+
+ /* Stop when slv is not yet come from server
+ * or lv is smaller than it is. */
+ lv = lvf * la * unused;
+ if (slv == 1 || lv < slv)
+ break;
+ } else {
+ if (added >= count)
+ break;
+ }
+ } else {
+ if ((added >= count) &&
+ (!(flags & LDLM_CANCEL_AGED) ||
+ cfs_time_before_64(cur, ns->ns_max_age +
+ lock->l_last_used)))
+ break;
+ }
LDLM_LOCK_GET(lock); /* dropped by bl thread */
spin_unlock(&ns->ns_unused_lock);
unlock_res_and_lock(lock);
spin_lock(&ns->ns_unused_lock);
added++;
+ unused--;
}
spin_unlock(&ns->ns_unused_lock);
* in a thread and this function will return after the thread has been
* asked to call the callback. when called with LDLM_SYNC the blocking
* callback will be performed in this function. */
-int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync)
+int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync)
{
CFS_LIST_HEAD(cancels);
int count, rc;
#ifndef __KERNEL__
sync = LDLM_SYNC; /* force to be sync in user space */
#endif
- count = ldlm_cancel_lru_local(ns, &cancels, 0, 0, 0);
+ count = ldlm_cancel_lru_local(ns, &cancels, nr, 0, 0);
if (sync == LDLM_ASYNC) {
- struct ldlm_lock *lock, *next;
- list_for_each_entry_safe(lock, next, &cancels, l_bl_ast) {
- /* Remove from the list to allow blocking thread to
- * re-use l_bl_ast. */
- list_del_init(&lock->l_bl_ast);
- rc = ldlm_bl_to_thread(ns, NULL, lock,
- LDLM_FL_CANCELING);
- if (rc)
- list_add_tail(&lock->l_bl_ast, &next->l_bl_ast);
- }
+ rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count);
+ if (rc == 0)
+ RETURN(count);
}
- /* If some locks are left in the list in ASYNC mode, or
+ /* If an error occured in ASYNC mode, or
* this is SYNC mode, cancel the list. */
- ldlm_cli_cancel_list(&cancels, count, NULL, DLM_LOCKREQ_OFF, 0);
- RETURN(0);
+ ldlm_cli_cancel_list(&cancels, count, NULL, 0, 0);
+ RETURN(count);
}
/* Find and cancel locally unused locks found on resource, matched to the
/* See CBPENDING comment in ldlm_cancel_lru */
lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_CANCELING |
- lock_flags;
+ lock_flags;
LASSERT(list_empty(&lock->l_bl_ast));
list_add(&lock->l_bl_ast, cancels);
ldlm_cancel_pack(req, off, cancels, count);
else
res = ldlm_cli_cancel_req(lock->l_conn_export,
- cancels, count, flags);
+ cancels, count,
+ flags);
} else {
res = ldlm_cli_cancel_req(lock->l_conn_export,
cancels, 1, flags);
count = ldlm_cancel_resource_local(res, &cancels, policy, mode,
0, flags, opaque);
- rc = ldlm_cli_cancel_list(&cancels, count, NULL,
- DLM_LOCKREQ_OFF, flags);
+ rc = ldlm_cli_cancel_list(&cancels, count, NULL, 0, flags);
if (rc != ELDLM_OK)
CERROR("ldlm_cli_cancel_unused_resource: %d\n", rc);
int count = 0;
ENTRY;
- LASSERT(ns->ns_client == LDLM_NAMESPACE_CLIENT);
+ LASSERT(ns_is_client(ns));
res = ldlm_resource_get(ns, NULL, res_id, LDLM_EXTENT, 0);
if (res == NULL)
!lock->l_readers && !lock->l_writers &&
!(lock->l_flags & LDLM_FL_LOCAL) &&
!(lock->l_flags & LDLM_FL_CBPENDING)) {
- lock->l_last_used = cfs_time_current();
- spin_lock(&ns->ns_unused_lock);
- LASSERT(ns->ns_nr_unused >= 0);
- list_add_tail(&lock->l_lru, &ns->ns_unused_list);
- ns->ns_nr_unused++;
- spin_unlock(&ns->ns_unused_lock);
+ ldlm_lock_add_to_lru(lock);
lock->l_flags &= ~LDLM_FL_NO_LRU;
LDLM_DEBUG(lock, "join lock to lru");
count++;