Description: osts not allocated evenly to files
Details : change the condition to increase offset_idx
+Severity : enhancement
+Bugzilla : 2262
+Description: self-adjustable client's lru lists
+Details : use adaptive algorithm for managing client cached locks lru
+ lists according to current server load, other client's work
+ pattern, memory activities, etc. Both, server and client
+ side namespaces provide number of proc tunables for controlling
+ things
--------------------------------------------------------------------------------
2007-08-27 Cluster File Systems, Inc. <info@clusterfs.com>
LC_CONFIG_LIBLUSTRE_RECOVERY
LC_CONFIG_QUOTA
LC_CONFIG_HEALTH_CHECK_WRITE
+LC_CONFIG_LRU_RESIZE
LC_TASK_PPTR
# RHEL4 patches
LC_CONFIG_LIBLUSTRE_RECOVERY
])
+AC_DEFUN([LC_CONFIG_LRU_RESIZE],
+[AC_MSG_CHECKING([whether to enable lru self-adjusting])
+AC_ARG_ENABLE([lru_resize],
+ AC_HELP_STRING([--enable-lru-resize],
+ [enable lru resize support]),
+ [],[enable_lru_resize='yes'])
+AC_MSG_RESULT([$enable_lru_resize])
+if test x$enable_lru_resize != xno; then
+ AC_DEFINE(HAVE_LRU_RESIZE_SUPPORT, 1, [Enable lru resize support])
+fi
+])
+
#
# LC_CONFIG_QUOTA
#
extern int lprocfs_rd_u64(char *page, char **start, off_t off,
int count, int *eof, void *data);
extern int lprocfs_rd_atomic(char *page, char **start, off_t off,
- int count, int *eof, void *data);
+ int count, int *eof, void *data);
+extern int lprocfs_wr_atomic(struct file *file, const char *buffer,
+ unsigned long count, void *data);
+extern int lprocfs_rd_uint(char *page, char **start, off_t off,
+ int count, int *eof, void *data);
+extern int lprocfs_wr_uint(struct file *file, const char *buffer,
+ unsigned long count, void *data);
extern int lprocfs_rd_uuid(char *page, char **start, off_t off,
int count, int *eof, void *data);
extern int lprocfs_rd_name(char *page, char **start, off_t off,
__u32 pb_conn_cnt;
__u32 pb_timeout; /* for req, the deadline, for rep, the service est */
__u32 pb_service_time; /* for rep, actual service time */
- __u32 pb_padding_1;
- __u32 pb_padding_2;
- __u32 pb_padding_3;
+ __u32 pb_limit;
+ __u64 pb_slv;
};
extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb);
#define OBD_CONNECT_CANCELSET 0x400000ULL /* Early batched cancels. */
#define OBD_CONNECT_SOM 0x00800000ULL /* Size on MDS */
#define OBD_CONNECT_AT 0x01000000ULL /* client uses adaptive timeouts */
+#define OBD_CONNECT_LRU_RESIZE 0x02000000ULL /* Lru resize feature. */
/* also update obd_connect_names[] for lprocfs_rd_connect_flags()
* and lustre/utils/wirecheck.c */
OBD_CONNECT_ACL | OBD_CONNECT_XATTR | \
OBD_CONNECT_IBITS | OBD_CONNECT_JOIN | \
OBD_CONNECT_NODEVOH | OBD_CONNECT_ATTRFID | \
- OBD_CONNECT_CANCELSET | OBD_CONNECT_AT)
+ OBD_CONNECT_CANCELSET | OBD_CONNECT_AT | \
+ OBD_CONNECT_LRU_RESIZE)
#define OST_CONNECT_SUPPORTED (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \
OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \
OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_INDEX | \
OBD_CONNECT_BRW_SIZE | OBD_CONNECT_QUOTA64 | \
- OBD_CONNECT_CANCELSET | OBD_CONNECT_AT)
+ OBD_CONNECT_CANCELSET | OBD_CONNECT_AT | \
+ OBD_CONNECT_LRU_RESIZE)
#define ECHO_CONNECT_SUPPORTED (0)
#define MGS_CONNECT_SUPPORTED (OBD_CONNECT_VERSION | OBD_CONNECT_AT)
#define OBD_OCD_VERSION_PATCH(version) ((int)((version)>>8)&255)
#define OBD_OCD_VERSION_FIX(version) ((int)(version)&255)
-#define exp_connect_cancelset(exp) \
- ((exp) ? (exp)->exp_connect_flags & OBD_CONNECT_CANCELSET : 0)
-
/* This structure is used for both request and reply.
*
* If we eventually have separate connect data for different types, which we
ELDLM_BAD_NAMESPACE = 401
} ldlm_error_t;
-#define LDLM_NAMESPACE_SERVER 0
-#define LDLM_NAMESPACE_CLIENT 1
+typedef enum {
+ LDLM_NAMESPACE_SERVER = 0,
+ LDLM_NAMESPACE_CLIENT = 1
+} ldlm_side_t;
#define LDLM_FL_LOCK_CHANGED 0x000001 /* extent, mode, or resource changed */
*
*/
+struct ldlm_pool;
struct ldlm_lock;
struct ldlm_resource;
struct ldlm_namespace;
+typedef int (*ldlm_pool_recalc_t)(struct ldlm_pool *pl);
+
+typedef int (*ldlm_pool_shrink_t)(struct ldlm_pool *pl,
+ int nr, unsigned int gfp_mask);
+
+enum {
+ LDLM_POOL_CTL_RECALC = 1 << 0, /* Pool recalc is enabled */
+ LDLM_POOL_CTL_SHRINK = 1 << 1, /* Pool shrink is enabled */
+ LDLM_POOL_CTL_FULL = (LDLM_POOL_CTL_RECALC | LDLM_POOL_CTL_SHRINK)
+};
+
+/* One second for pools thread check interval. */
+#define LDLM_POOLS_THREAD_PERIOD (1)
+
+/* 5% margin for modest pools. See ldlm_pool.c for details. */
+#define LDLM_POOLS_MODEST_MARGIN (5)
+
+/* A change to SLV in % after which we want to wake up pools thread asap. */
+#define LDLM_POOLS_FAST_SLV_CHANGE (5)
+
+struct ldlm_pool {
+ /* Common pool fields */
+ cfs_proc_dir_entry_t *pl_proc_dir; /* Pool proc directory. */
+ char pl_name[100]; /* Pool name, should be long
+ * enough to contain complex
+ * proc entry name. */
+ spinlock_t pl_lock; /* Lock for protecting slv/clv
+ * updates. */
+ atomic_t pl_limit; /* Number of allowed locks in
+ * in pool, both, client and
+ * server side. */
+ atomic_t pl_granted; /* Number of granted locks. */
+ atomic_t pl_grant_rate; /* Grant rate per T. */
+ atomic_t pl_cancel_rate; /* Cancel rate per T. */
+ atomic_t pl_grant_speed; /* Grant speed (GR - CR) per T. */
+ __u64 pl_server_lock_volume; /* Server lock volume. Protected
+ * by pl_lock. */
+ cfs_time_t pl_update_time; /* Time when last slv from server
+ * was obtained. */
+ ldlm_pool_recalc_t pl_recalc; /* Recalc callback func pointer. */
+ ldlm_pool_shrink_t pl_shrink; /* Shrink callback func pointer. */
+ int pl_control; /* Pool features mask */
+
+ /* Server side pool fields */
+ atomic_t pl_grant_plan; /* Planned number of granted
+ * locks for next T. */
+ atomic_t pl_grant_step; /* Grant plan step for next T. */
+
+ /* Client side pool related fields */
+ atomic_t pl_lock_volume_factor; /* Lock volume factor. */
+ struct lprocfs_stats *pl_stats; /* Pool statistics. */
+};
+
+static inline int pool_recalc_enabled(struct ldlm_pool *pl)
+{
+ return pl->pl_control & LDLM_POOL_CTL_RECALC;
+}
+
+static inline int pool_shrink_enabled(struct ldlm_pool *pl)
+{
+ return pl->pl_control & LDLM_POOL_CTL_SHRINK;
+}
+
typedef int (*ldlm_res_policy)(struct ldlm_namespace *, struct ldlm_lock **,
void *req_cookie, ldlm_mode_t mode, int flags,
void *data);
int buf_idx, int increase);
};
+typedef enum {
+ LDLM_NAMESPACE_GREEDY = 1 << 0,
+ LDLM_NAMESPACE_MODEST = 1 << 1
+} ldlm_appetite_t;
+
/* default values for the "max_nolock_size", "contention_time"
* and "contended_locks" namespace tunables */
#define NS_DEFAULT_MAX_NOLOCK_BYTES 131072
struct ldlm_namespace {
char *ns_name;
- __u32 ns_client; /* is this a client-side lock tree? */
+ ldlm_side_t ns_client; /* is this a client-side lock tree? */
+ __u64 ns_connect_flags; /* client side connect flags
+ * supported by server */
struct list_head *ns_hash; /* hash table for ns */
spinlock_t ns_hash_lock;
__u32 ns_refcount; /* count of resources in the hash */
struct ldlm_valblock_ops *ns_lvbo;
void *ns_lvbp;
cfs_waitq_t ns_waitq;
+ struct ldlm_pool ns_pool;
+ ldlm_appetite_t ns_appetite;
+
/* if more than @ns_contented_locks found, the resource considered
* as contended */
unsigned ns_contended_locks;
unsigned ns_max_nolock_size;
};
+static inline int ns_connect_lru_resize(struct ldlm_namespace *ns)
+{
+ LASSERT(ns != NULL);
+ return ns->ns_connect_flags & OBD_CONNECT_LRU_RESIZE;
+}
+
/*
*
* Resource hash table
struct ldlm_request *dlm_req, int first);
int ldlm_del_waiting_lock(struct ldlm_lock *lock);
int ldlm_refresh_waiting_lock(struct ldlm_lock *lock);
-int ldlm_get_ref(void);
-void ldlm_put_ref(int force);
+int ldlm_get_ref(ldlm_side_t client);
+void ldlm_put_ref(ldlm_side_t client, int force);
/* ldlm_lock.c */
ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res);
void ldlm_unlink_lock_skiplist(struct ldlm_lock *req);
/* resource.c */
-struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 local);
+struct ldlm_namespace *ldlm_namespace_new(char *name, ldlm_side_t client,
+ ldlm_appetite_t apt);
int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int flags);
int ldlm_namespace_free(struct ldlm_namespace *ns, int force);
int ldlm_proc_setup(void);
int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *new,
void *data, __u32 data_len);
int ldlm_cli_convert(struct lustre_handle *, int new_mode, int *flags);
+int ldlm_cli_update_pool(struct ptlrpc_request *req);
int ldlm_cli_cancel(struct lustre_handle *lockh);
int ldlm_cli_cancel_unused(struct ldlm_namespace *, struct ldlm_res_id *,
int flags, void *opaque);
struct ldlm_resource * lock_res_and_lock(struct ldlm_lock *lock);
void unlock_res_and_lock(struct ldlm_lock *lock);
+/* ldlm_pool.c */
+int ldlm_pools_init(ldlm_side_t client);
+void ldlm_pools_fini(void);
+void ldlm_pools_wakeup(void);
+int ldlm_pools_shrink(int nr, unsigned int gfp_mask);
+
+int ldlm_pool_init(struct ldlm_pool *pl, struct ldlm_namespace *ns,
+ int idx, ldlm_side_t client);
+int ldlm_pool_shrink(struct ldlm_pool *pl, int nr,
+ unsigned int gfp_mask);
+void ldlm_pool_fini(struct ldlm_pool *pl);
+int ldlm_pool_setup(struct ldlm_pool *pl, __u32 limit);
+int ldlm_pool_recalc(struct ldlm_pool *pl);
+__u64 ldlm_pool_get_slv(struct ldlm_pool *pl);
+__u32 ldlm_pool_get_limit(struct ldlm_pool *pl);
+void ldlm_pool_set_slv(struct ldlm_pool *pl, __u64 slv);
+void ldlm_pool_set_limit(struct ldlm_pool *pl, __u32 limit);
+void ldlm_pool_add(struct ldlm_pool *pl, struct ldlm_lock *lock);
+void ldlm_pool_del(struct ldlm_pool *pl, struct ldlm_lock *lock);
#endif
#define exp_filter_data u.eu_filter_data
#define exp_ec_data u.eu_ec_data
+static inline int exp_connect_cancelset(struct obd_export *exp)
+{
+ return exp ? exp->exp_connect_flags & OBD_CONNECT_CANCELSET : 0;
+}
+
+static inline int exp_connect_lru_resize(struct obd_export *exp)
+{
+ LASSERT(exp != NULL);
+ return exp->exp_connect_flags & OBD_CONNECT_LRU_RESIZE;
+}
+
+static inline int imp_connect_lru_resize(struct obd_import *imp)
+{
+ LASSERT(imp != NULL);
+ return imp->imp_connect_data.ocd_connect_flags &
+ OBD_CONNECT_LRU_RESIZE;
+}
+
extern struct obd_export *class_conn2export(struct lustre_handle *conn);
extern struct obd_device *class_conn2obd(struct lustre_handle *conn);
int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp,
struct obd_uuid *cluuid);
int target_handle_ping(struct ptlrpc_request *req);
+int target_pack_pool_reply(struct ptlrpc_request *req);
void target_committed_to_req(struct ptlrpc_request *req);
#ifdef HAVE_QUOTA_SUPPORT
__u64 lustre_msg_get_last_xid(struct lustre_msg *msg);
__u64 lustre_msg_get_last_committed(struct lustre_msg *msg);
__u64 lustre_msg_get_transno(struct lustre_msg *msg);
+__u64 lustre_msg_get_slv(struct lustre_msg *msg);
+__u32 lustre_msg_get_limit(struct lustre_msg *msg);
+void lustre_msg_set_slv(struct lustre_msg *msg, __u64 slv);
+void lustre_msg_set_limit(struct lustre_msg *msg, __u64 limit);
int lustre_msg_get_status(struct lustre_msg *msg);
__u32 lustre_msg_get_conn_cnt(struct lustre_msg *msg);
int lustre_msg_is_v1(struct lustre_msg *msg);
MOSTLYCLEANFILES := @MOSTLYCLEANFILES@
DIST_SOURCES = ldlm_extent.c ldlm_flock.c ldlm_internal.h ldlm_lib.c \
ldlm_lock.c ldlm_lockd.c ldlm_plain.c ldlm_request.c \
- ldlm_resource.c l_lock.c ldlm_inodebits.c
+ ldlm_resource.c l_lock.c ldlm_inodebits.c ldlm_pool.c
* vim:expandtab:shiftwidth=8:tabstop=8:
*/
+#define MAX_STRING_SIZE 128
+
/* ldlm_request.c */
typedef enum {
LDLM_ASYNC,
/* Cancel lru flag, it indicates we cancel aged locks. */
#define LDLM_CANCEL_AGED 0x00000001
-int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync);
+int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync);
int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
int count, int max, int flags);
void l_check_no_ns_lock(struct ldlm_namespace *ns);
extern cfs_proc_dir_entry_t *ldlm_svc_proc_dir;
+extern cfs_proc_dir_entry_t *ldlm_type_proc_dir;
struct ldlm_state {
struct ptlrpc_service *ldlm_cb_service;
int ldlm_init(void);
void ldlm_exit(void);
-
} else {
cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT;
}
- rc = ldlm_get_ref();
+ rc = ldlm_get_ref(LDLM_NAMESPACE_CLIENT);
if (rc) {
CERROR("ldlm_get_ref failed: %d\n", rc);
GOTO(err, rc);
err_import:
class_destroy_import(imp);
err_ldlm:
- ldlm_put_ref(0);
+ ldlm_put_ref(LDLM_NAMESPACE_CLIENT, 0);
err:
RETURN(rc);
int client_obd_cleanup(struct obd_device *obddev)
{
ENTRY;
- ldlm_put_ref(obddev->obd_force);
-
+ ldlm_put_ref(LDLM_NAMESPACE_CLIENT, obddev->obd_force);
RETURN(0);
}
if (obd->obd_namespace != NULL)
CERROR("already have namespace!\n");
obd->obd_namespace = ldlm_namespace_new(obd->obd_name,
- LDLM_NAMESPACE_CLIENT);
+ LDLM_NAMESPACE_CLIENT,
+ LDLM_NAMESPACE_GREEDY);
if (obd->obd_namespace == NULL)
GOTO(out_disco, rc = -ENOMEM);
return 1;
}
+static inline struct ldlm_pool *ldlm_exp2pl(struct obd_export *exp)
+{
+ LASSERT(exp != NULL);
+ return &exp->exp_obd->obd_namespace->ns_pool;
+}
+
+int target_pack_pool_reply(struct ptlrpc_request *req)
+{
+ struct ldlm_pool *pl;
+ ENTRY;
+
+ if (!exp_connect_lru_resize(req->rq_export))
+ RETURN(0);
+
+ pl = ldlm_exp2pl(req->rq_export);
+
+ spin_lock(&pl->pl_lock);
+ lustre_msg_set_slv(req->rq_repmsg, ldlm_pool_get_slv(pl));
+ lustre_msg_set_limit(req->rq_repmsg, ldlm_pool_get_limit(pl));
+ spin_unlock(&pl->pl_lock);
+
+ RETURN(0);
+}
+
int
target_send_reply_msg (struct ptlrpc_request *req, int rc, int fail_id)
{
DEBUG_REQ(D_NET, req, "sending reply");
}
+ target_pack_pool_reply(req);
return (ptlrpc_send_reply(req, PTLRPC_REPLY_MAYBE_DIFFICULT));
}
LDLM_LOCK_PUT(lock); /* matches the ldlm_lock_get in addref */
}
+
void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
{
struct ldlm_namespace *ns;
ldlm_lock_remove_from_lru(lock);
unlock_res_and_lock(lock);
if ((lock->l_flags & LDLM_FL_ATOMIC_CB) ||
- ldlm_bl_to_thread(ns, NULL, lock, 0) != 0)
+ ldlm_bl_to_thread(ns, NULL, lock, 0) != 0)
ldlm_handle_bl_callback(ns, NULL, lock);
} else if (ns->ns_client == LDLM_NAMESPACE_CLIENT &&
!lock->l_readers && !lock->l_writers &&
/* Call ldlm_cancel_lru() only if EARLY_CANCEL is not supported
* by the server, otherwise, it is done on enqueue. */
if (!exp_connect_cancelset(lock->l_conn_export))
- ldlm_cancel_lru(ns, LDLM_ASYNC);
+ ldlm_cancel_lru(ns, 0, LDLM_ASYNC);
} else {
unlock_res_and_lock(lock);
}
if (work_list && lock->l_completion_ast != NULL)
ldlm_add_ast_work_item(lock, NULL, work_list);
+ ldlm_pool_add(&res->lr_namespace->ns_pool, lock);
EXIT;
}
ldlm_del_waiting_lock(lock);
ldlm_resource_unlink_lock(lock);
ldlm_lock_destroy_nolock(lock);
+
+ if (lock->l_granted_mode == lock->l_req_mode)
+ ldlm_pool_del(&ns->ns_pool, lock);
+
+ /* Make sure we will not be called again for same lock what is possible
+ * if not to zero out lock->l_granted_mode */
+ lock->l_granted_mode = 0;
unlock_res_and_lock(lock);
EXIT;
extern cfs_mem_cache_t *ldlm_lock_slab;
extern struct lustre_lock ldlm_handle_lock;
extern struct list_head ldlm_namespace_list;
-
extern struct semaphore ldlm_namespace_lock;
+
static struct semaphore ldlm_ref_sem;
static int ldlm_refcount;
-/* LDLM state */
-
static struct ldlm_state *ldlm_state;
inline cfs_time_t round_timeout(cfs_time_t timeout)
int i, count, done = 0;
ENTRY;
- LDLM_DEBUG_NOLOCK("server-side cancel handler START: %d locks, "
- "starting at %d", dlm_req->lock_count, first);
count = dlm_req->lock_count ? dlm_req->lock_count : 1;
if (first >= count)
RETURN(0);
if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
RETURN(0);
+ LDLM_DEBUG_NOLOCK("server-side cancel handler START: %d locks",
+ count - first);
for (i = first; i < count; i++) {
lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);
if (!lock) {
#endif
-static int ldlm_setup(void);
-static int ldlm_cleanup(int force);
+static int ldlm_setup(ldlm_side_t client);
+static int ldlm_cleanup(ldlm_side_t client, int force);
-int ldlm_get_ref(void)
+int ldlm_get_ref(ldlm_side_t client)
{
int rc = 0;
ENTRY;
mutex_down(&ldlm_ref_sem);
if (++ldlm_refcount == 1) {
- rc = ldlm_setup();
+ rc = ldlm_setup(client);
if (rc)
ldlm_refcount--;
}
RETURN(rc);
}
-void ldlm_put_ref(int force)
+void ldlm_put_ref(ldlm_side_t client, int force)
{
ENTRY;
mutex_down(&ldlm_ref_sem);
if (ldlm_refcount == 1) {
- int rc = ldlm_cleanup(force);
+ int rc = ldlm_cleanup(client, force);
if (rc)
CERROR("ldlm_cleanup failed: %d\n", rc);
else
EXIT;
}
-static int ldlm_setup(void)
+static int ldlm_setup(ldlm_side_t client)
{
struct ldlm_bl_pool *blp;
int rc = 0;
expired_lock_thread.elt_state == ELT_READY);
#endif
+#ifdef __KERNEL__
+ rc = ldlm_pools_init(client);
+ if (rc)
+ GOTO(out_thread, rc);
+#endif
+
RETURN(0);
#ifdef __KERNEL__
return rc;
}
-static int ldlm_cleanup(int force)
+static int ldlm_cleanup(ldlm_side_t client, int force)
{
#ifdef __KERNEL__
struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
}
#ifdef __KERNEL__
+ ldlm_pools_fini();
+#endif
+
+#ifdef __KERNEL__
while (atomic_read(&blp->blp_num_threads) > 0) {
struct ldlm_bl_work_item blwi = { .blwi_ns = NULL };
EXPORT_SYMBOL(target_send_reply);
EXPORT_SYMBOL(target_queue_recovery_request);
EXPORT_SYMBOL(target_handle_ping);
+EXPORT_SYMBOL(target_pack_pool_reply);
EXPORT_SYMBOL(target_handle_disconnect);
EXPORT_SYMBOL(target_queue_final_reply);
--- /dev/null
+/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
+ * vim:expandtab:shiftwidth=8:tabstop=8:
+ *
+ * Copyright (c) 2007 Cluster File Systems, Inc.
+ * Author: Yury Umanets <umka@clusterfs.com>
+ *
+ * This file is part of the Lustre file system, http://www.lustre.org
+ * Lustre is a trademark of Cluster File Systems, Inc.
+ *
+ * You may have signed or agreed to another license before downloading
+ * this software. If so, you are bound by the terms and conditions
+ * of that agreement, and the following does not apply to you. See the
+ * LICENSE file included with this distribution for more information.
+ *
+ * If you did not agree to a different license, then this copy of Lustre
+ * is open source software; you can redistribute it and/or modify it
+ * under the terms of version 2 of the GNU General Public License as
+ * published by the Free Software Foundation.
+ *
+ * In either case, Lustre is distributed in the hope that it will be
+ * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * license text for more details.
+ */
+
+/* Idea of this code is rather simple. Each second, for each server namespace
+ * we have SLV - server lock volume which is calculated on current number of
+ * granted locks, grant speed for past period, etc - that is, locking load.
+ * This SLV number may be thought as a flow definition for simplicity. It is
+ * sent to clients with each occasion to let them know what is current load
+ * situation on the server. By default, at the beginning, SLV on server is
+ * set max value which is calculated as the following: allow to one client
+ * have all locks of limit ->pl_limit for 10h.
+ *
+ * Next, on clients, number of cached locks is not limited artificially in any
+ * way as it was before. Instead, client calculates CLV, that is, client lock
+ * volume for each lock and compares it with last SLV from the server. CLV is
+ * calculated as the number of locks in LRU * lock live time in seconds. If
+ * CLV > SLV - lock is canceled.
+ *
+ * Client has LVF, that is, lock volume factor which regulates how much sensitive
+ * client should be about last SLV from server. The higher LVF is the more locks
+ * will be canceled on client. Default value for it is 1. Setting LVF to 2 means
+ * that client will cancel locks 2 times faster.
+ *
+ * Locks on a client will be canceled more intensively in these cases:
+ * (1) if SLV is smaller, that is, load is higher on the server;
+ * (2) client has a lot of locks (the more locks are held by client, the bigger
+ * chances that some of them should be canceled);
+ * (3) client has old locks (taken some time ago);
+ *
+ * Thus, according to flow paradigm that we use for better understanding SLV,
+ * CLV is the volume of particle in flow described by SLV. According to this,
+ * if flow is getting thinner, more and more particles become outside of it and
+ * as particles are locks, they should be canceled.
+ *
+ * General idea of this belongs to Vitaly Fertman (vitaly@clusterfs.com). Andreas
+ * Dilger (adilger@clusterfs.com) proposed few nice ideas like using LVF and many
+ * cleanups. Flow definition to allow more easy understanding of the logic belongs
+ * to Nikita Danilov (nikita@clusterfs.com) as well as many cleanups and fixes.
+ * And design and implementation are done by Yury Umanets (umka@clusterfs.com).
+ *
+ * Glossary for terms used:
+ *
+ * pl_limit - Number of allowed locks in pool. Applies to server and client
+ * side (tunable);
+ *
+ * pl_granted - Number of granted locks (calculated);
+ * pl_grant_rate - Number of granted locks for last T (calculated);
+ * pl_cancel_rate - Number of canceled locks for last T (calculated);
+ * pl_grant_speed - Grant speed (GR - CR) for last T (calculated);
+ * pl_grant_plan - Planned number of granted locks for next T (calculated);
+ *
+ * pl_grant_step - Grant plan step, that is how ->pl_grant_plan
+ * will change in next T (tunable);
+ *
+ * pl_server_lock_volume - Current server lock volume (calculated);
+ *
+ * As it may be seen from list above, we have few possible tunables which may
+ * affect behavior much. They all may be modified via proc. However, they also
+ * give a possibility for constructing few pre-defined behavior policies. If
+ * none of predefines is suitable for a working pattern being used, new one may
+ * be "constructed" via proc tunables.
+ */
+
+#define DEBUG_SUBSYSTEM S_LDLM
+
+#ifdef __KERNEL__
+# include <lustre_dlm.h>
+#else
+# include <liblustre.h>
+# include <libcfs/kp30.h>
+#endif
+
+#include <obd_class.h>
+#include <obd_support.h>
+#include "ldlm_internal.h"
+
+#ifdef HAVE_LRU_RESIZE_SUPPORT
+
+/* 50 ldlm locks for 1MB of RAM. */
+#define LDLM_POOL_HOST_L ((num_physpages >> (20 - PAGE_SHIFT)) * 50)
+
+/* Default step in % for grant plan. */
+#define LDLM_POOL_GSP (5)
+
+/* LDLM_POOL_GSP% of all locks is default GP. */
+#define LDLM_POOL_GP(L) ((L) * LDLM_POOL_GSP / 100)
+
+/* Max age for locks on clients. */
+#define LDLM_POOL_MAX_AGE (36000)
+
+#ifdef __KERNEL__
+extern cfs_proc_dir_entry_t *ldlm_ns_proc_dir;
+#endif
+
+extern atomic_t ldlm_srv_namespace_nr;
+extern atomic_t ldlm_cli_namespace_nr;
+extern struct list_head ldlm_namespace_list;
+extern struct semaphore ldlm_namespace_lock;
+
+#define avg(src, add) \
+ ((src) = ((src) + (add)) / 2)
+
+static inline __u64 dru(__u64 val, __u32 div)
+{
+ __u64 ret = val + (div - 1);
+ do_div(ret, div);
+ return ret;
+}
+
+static inline __u64 ldlm_pool_slv_max(__u32 L)
+{
+ /* Allow to have all locks for 1 client for 10 hrs.
+ * Formula is the following: limit * 10h / 1 client. */
+ __u64 lim = L * LDLM_POOL_MAX_AGE / 1;
+ return lim;
+}
+
+static inline __u64 ldlm_pool_slv_min(__u32 L)
+{
+ return 1;
+}
+
+enum {
+ LDLM_POOL_GRANTED_STAT = 0,
+ LDLM_POOL_GRANT_RATE_STAT,
+ LDLM_POOL_CANCEL_RATE_STAT,
+ LDLM_POOL_GRANT_PLAN_STAT,
+ LDLM_POOL_SLV_STAT,
+ LDLM_POOL_LAST_STAT
+};
+
+static inline struct ldlm_namespace *ldlm_pl2ns(struct ldlm_pool *pl)
+{
+ return container_of(pl, struct ldlm_namespace, ns_pool);
+}
+
+static int ldlm_srv_pool_recalc(struct ldlm_pool *pl)
+{
+ int slv_factor, limit, granted, grant_speed;
+ int grant_rate, cancel_rate, grant_step;
+ time_t recalc_interval_sec;
+ __u32 grant_plan;
+ __u64 slv;
+ ENTRY;
+
+ spin_lock(&pl->pl_lock);
+
+ /* Get all values to local variables to avoid change some of them in
+ * the middle of re-calc. */
+ slv = ldlm_pool_get_slv(pl);
+ limit = ldlm_pool_get_limit(pl);
+ granted = atomic_read(&pl->pl_granted);
+ grant_rate = atomic_read(&pl->pl_grant_rate);
+ grant_plan = atomic_read(&pl->pl_grant_plan);
+ grant_step = atomic_read(&pl->pl_grant_step);
+ grant_speed = atomic_read(&pl->pl_grant_speed);
+ cancel_rate = atomic_read(&pl->pl_cancel_rate);
+
+ /* Zero out grant/cancel rates and speed for this T. */
+ atomic_set(&pl->pl_grant_rate, 0);
+ atomic_set(&pl->pl_cancel_rate, 0);
+ atomic_set(&pl->pl_grant_speed, 0);
+
+ /* Make sure that we use correct data for statistics. Pools thread may
+ * be not scheduled long time due to big CPU contention. We need to
+ * catch this. */
+ recalc_interval_sec = cfs_duration_sec(cfs_time_current() -
+ pl->pl_update_time);
+ if (recalc_interval_sec == 0)
+ recalc_interval_sec = 1;
+
+ lprocfs_counter_add(pl->pl_stats, LDLM_POOL_SLV_STAT, slv);
+ lprocfs_counter_add(pl->pl_stats, LDLM_POOL_GRANTED_STAT,
+ granted);
+ lprocfs_counter_add(pl->pl_stats, LDLM_POOL_GRANT_RATE_STAT,
+ grant_rate / recalc_interval_sec);
+ lprocfs_counter_add(pl->pl_stats, LDLM_POOL_GRANT_PLAN_STAT,
+ grant_plan / recalc_interval_sec);
+ lprocfs_counter_add(pl->pl_stats, LDLM_POOL_CANCEL_RATE_STAT,
+ cancel_rate / recalc_interval_sec);
+
+ /* Correcting old @grant_plan which may be obsolete in the case of big
+ * load on the server, when pools thread is not scheduled every 1s sharp
+ * (curent period). All values used in calculation are updated from
+ * other threads and up-to-date. Only @grant_plan is calculated by pool
+ * thread and directly affects SLV. */
+ grant_plan += grant_speed - (grant_speed / recalc_interval_sec);
+
+ if ((slv_factor = limit - (granted - grant_plan)) <= 0)
+ slv_factor = 1;
+
+ grant_plan = granted + ((limit - granted) * grant_step) / 100;
+ slv = (slv * ((slv_factor * 100) / limit));
+ slv = dru(slv, 100);
+
+ if (slv > ldlm_pool_slv_max(limit)) {
+ CDEBUG(D_DLMTRACE, "Correcting SLV to allowed max "LPU64"\n",
+ ldlm_pool_slv_max(limit));
+ slv = ldlm_pool_slv_max(limit);
+ } else if (slv < ldlm_pool_slv_min(limit)) {
+ CDEBUG(D_DLMTRACE, "Correcting SLV to allowed min "LPU64"\n",
+ ldlm_pool_slv_min(limit));
+ slv = ldlm_pool_slv_min(limit);
+ }
+
+ ldlm_pool_set_slv(pl, slv);
+ atomic_set(&pl->pl_grant_plan, grant_plan);
+ pl->pl_update_time = cfs_time_current();
+ spin_unlock(&pl->pl_lock);
+
+ RETURN(0);
+}
+
+/* Our goal here is to decrease SLV the way to make a client hold
+ * @nr locks smaller in next 10h. */
+static int ldlm_srv_pool_shrink(struct ldlm_pool *pl,
+ int nr, unsigned int gfp_mask)
+{
+ __u32 granted, limit;
+ __u64 slv_delta;
+ ENTRY;
+
+ /* Client already canceled locks but server is already in shrinker and
+ * can't cancel anything. Let's catch this race. */
+ if ((granted = atomic_read(&pl->pl_granted)) == 0)
+ RETURN(0);
+
+ spin_lock(&pl->pl_lock);
+
+ /* Simple proportion but it gives impression on how much should be
+ * SLV changed for request @nr of locks to be canceled.*/
+ slv_delta = nr * ldlm_pool_get_slv(pl);
+ limit = ldlm_pool_get_limit(pl);
+ do_div(slv_delta, granted);
+
+ /* As SLV has some dependence on historical data, that is new value
+ * is based on old one, this decreasing will make clients get some
+ * locks back to the server and after some time it will stabilize.*/
+ if (slv_delta < ldlm_pool_get_slv(pl))
+ ldlm_pool_set_slv(pl, ldlm_pool_get_slv(pl) - slv_delta);
+ else
+ ldlm_pool_set_slv(pl, ldlm_pool_slv_min(limit));
+ spin_unlock(&pl->pl_lock);
+
+ /* We did not really free any memory here so far, it only will be
+ * freed later may be, so that we return 0 to not confuse VM. */
+ RETURN(0);
+}
+
+static int ldlm_cli_pool_recalc(struct ldlm_pool *pl)
+{
+ int grant_rate, cancel_rate;
+ time_t recalc_interval_sec;
+ ENTRY;
+
+ spin_lock(&pl->pl_lock);
+ grant_rate = atomic_read(&pl->pl_grant_rate);
+ cancel_rate = atomic_read(&pl->pl_cancel_rate);
+
+ recalc_interval_sec = cfs_duration_sec(cfs_time_current() -
+ pl->pl_update_time);
+ if (recalc_interval_sec == 0)
+ recalc_interval_sec = 1;
+
+ lprocfs_counter_add(pl->pl_stats, LDLM_POOL_SLV_STAT,
+ ldlm_pool_get_slv(pl));
+ lprocfs_counter_add(pl->pl_stats, LDLM_POOL_GRANTED_STAT,
+ atomic_read(&pl->pl_granted));
+ lprocfs_counter_add(pl->pl_stats, LDLM_POOL_GRANT_RATE_STAT,
+ grant_rate / recalc_interval_sec);
+ lprocfs_counter_add(pl->pl_stats, LDLM_POOL_CANCEL_RATE_STAT,
+ cancel_rate / recalc_interval_sec);
+
+ spin_unlock(&pl->pl_lock);
+
+ ldlm_cancel_lru(ldlm_pl2ns(pl), 0, LDLM_ASYNC);
+ RETURN(0);
+}
+
+static int ldlm_cli_pool_shrink(struct ldlm_pool *pl,
+ int nr, unsigned int gfp_mask)
+{
+ ENTRY;
+ RETURN(ldlm_cancel_lru(ldlm_pl2ns(pl), nr, LDLM_SYNC));
+}
+
+int ldlm_pool_recalc(struct ldlm_pool *pl)
+{
+ if (pl->pl_recalc != NULL && pool_recalc_enabled(pl))
+ return pl->pl_recalc(pl);
+ return 0;
+}
+EXPORT_SYMBOL(ldlm_pool_recalc);
+
+int ldlm_pool_shrink(struct ldlm_pool *pl, int nr,
+ unsigned int gfp_mask)
+{
+ if (pl->pl_shrink != NULL && pool_shrink_enabled(pl)) {
+ CDEBUG(D_DLMTRACE, "%s: request to shrink %d locks\n",
+ pl->pl_name, nr);
+ return pl->pl_shrink(pl, nr, gfp_mask);
+ }
+ return 0;
+}
+EXPORT_SYMBOL(ldlm_pool_shrink);
+
+/* The purpose of this function is to re-setup limit and maximal allowed
+ * slv according to the passed limit. */
+int ldlm_pool_setup(struct ldlm_pool *pl, __u32 limit)
+{
+ ENTRY;
+ if (ldlm_pl2ns(pl)->ns_client == LDLM_NAMESPACE_SERVER) {
+ spin_lock(&pl->pl_lock);
+ ldlm_pool_set_limit(pl, limit);
+ spin_unlock(&pl->pl_lock);
+ }
+ RETURN(0);
+}
+EXPORT_SYMBOL(ldlm_pool_setup);
+
+#ifdef __KERNEL__
+static int lprocfs_rd_pool_state(char *page, char **start, off_t off,
+ int count, int *eof, void *data)
+{
+ int nr = 0, granted, grant_rate, cancel_rate;
+ int grant_speed, grant_plan, grant_step;
+ struct ldlm_pool *pl = data;
+ __u32 limit;
+ __u64 slv;
+
+ spin_lock(&pl->pl_lock);
+ slv = pl->pl_server_lock_volume;
+ limit = ldlm_pool_get_limit(pl);
+ granted = atomic_read(&pl->pl_granted);
+ grant_rate = atomic_read(&pl->pl_grant_rate);
+ cancel_rate = atomic_read(&pl->pl_cancel_rate);
+ grant_speed = atomic_read(&pl->pl_grant_speed);
+ grant_plan = atomic_read(&pl->pl_grant_plan);
+ grant_step = atomic_read(&pl->pl_grant_step);
+ spin_unlock(&pl->pl_lock);
+
+ nr += snprintf(page + nr, count - nr, "LDLM pool state (%s):\n",
+ pl->pl_name);
+ nr += snprintf(page + nr, count - nr, " SLV: "LPU64"\n", slv);
+ if (ldlm_pl2ns(pl)->ns_client == LDLM_NAMESPACE_SERVER) {
+ nr += snprintf(page + nr, count - nr, " GSP: %d%%\n",
+ grant_step);
+ nr += snprintf(page + nr, count - nr, " GP: %d\n",
+ grant_plan);
+ } else {
+ nr += snprintf(page + nr, count - nr, " LVF: %d\n",
+ atomic_read(&pl->pl_lock_volume_factor));
+ }
+ nr += snprintf(page + nr, count - nr, " GR: %d\n", grant_rate);
+ nr += snprintf(page + nr, count - nr, " CR: %d\n", cancel_rate);
+ nr += snprintf(page + nr, count - nr, " GS: %d\n", grant_speed);
+ nr += snprintf(page + nr, count - nr, " G: %d\n", granted);
+ nr += snprintf(page + nr, count - nr, " L: %d\n", limit);
+ return nr;
+}
+
+static int ldlm_pool_proc_init(struct ldlm_pool *pl)
+{
+ struct ldlm_namespace *ns = ldlm_pl2ns(pl);
+ struct proc_dir_entry *parent_ns_proc;
+ struct lprocfs_vars pool_vars[2];
+ char *var_name = NULL;
+ int rc = 0;
+ ENTRY;
+
+ OBD_ALLOC(var_name, MAX_STRING_SIZE + 1);
+ if (!var_name)
+ RETURN(-ENOMEM);
+
+ parent_ns_proc = lprocfs_srch(ldlm_ns_proc_dir, ns->ns_name);
+ if (parent_ns_proc == NULL) {
+ CERROR("%s: proc entry is not initialized\n",
+ ns->ns_name);
+ GOTO(out_free_name, rc = -EINVAL);
+ }
+ pl->pl_proc_dir = lprocfs_register("pool", parent_ns_proc,
+ NULL, NULL);
+ if (IS_ERR(pl->pl_proc_dir)) {
+ CERROR("LProcFS failed in ldlm-pool-init\n");
+ rc = PTR_ERR(pl->pl_proc_dir);
+ GOTO(out_free_name, rc);
+ }
+
+ var_name[MAX_STRING_SIZE] = '\0';
+ memset(pool_vars, 0, sizeof(pool_vars));
+ pool_vars[0].name = var_name;
+
+ snprintf(var_name, MAX_STRING_SIZE, "server_lock_volume");
+ pool_vars[0].data = &pl->pl_server_lock_volume;
+ pool_vars[0].read_fptr = lprocfs_rd_u64;
+ lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
+
+ snprintf(var_name, MAX_STRING_SIZE, "limit");
+ pool_vars[0].data = &pl->pl_limit;
+ pool_vars[0].read_fptr = lprocfs_rd_atomic;
+ pool_vars[0].write_fptr = lprocfs_wr_atomic;
+ lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
+
+ snprintf(var_name, MAX_STRING_SIZE, "granted");
+ pool_vars[0].data = &pl->pl_granted;
+ pool_vars[0].read_fptr = lprocfs_rd_atomic;
+ lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
+
+ snprintf(var_name, MAX_STRING_SIZE, "control");
+ pool_vars[0].data = &pl->pl_control;
+ pool_vars[0].read_fptr = lprocfs_rd_uint;
+ pool_vars[0].write_fptr = lprocfs_wr_uint;
+ lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
+
+ snprintf(var_name, MAX_STRING_SIZE, "grant_speed");
+ pool_vars[0].data = &pl->pl_grant_speed;
+ pool_vars[0].read_fptr = lprocfs_rd_atomic;
+ lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
+
+ snprintf(var_name, MAX_STRING_SIZE, "cancel_rate");
+ pool_vars[0].data = &pl->pl_cancel_rate;
+ pool_vars[0].read_fptr = lprocfs_rd_atomic;
+ lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
+
+ snprintf(var_name, MAX_STRING_SIZE, "grant_rate");
+ pool_vars[0].data = &pl->pl_grant_rate;
+ pool_vars[0].read_fptr = lprocfs_rd_atomic;
+ lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
+
+ if (ns->ns_client == LDLM_NAMESPACE_SERVER) {
+ snprintf(var_name, MAX_STRING_SIZE, "grant_plan");
+ pool_vars[0].data = &pl->pl_grant_plan;
+ pool_vars[0].read_fptr = lprocfs_rd_atomic;
+ lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
+
+ snprintf(var_name, MAX_STRING_SIZE, "grant_step");
+ pool_vars[0].data = &pl->pl_grant_step;
+ pool_vars[0].read_fptr = lprocfs_rd_atomic;
+ pool_vars[0].write_fptr = lprocfs_wr_atomic;
+ lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
+ } else {
+ snprintf(var_name, MAX_STRING_SIZE, "lock_volume_factor");
+ pool_vars[0].data = &pl->pl_lock_volume_factor;
+ pool_vars[0].read_fptr = lprocfs_rd_uint;
+ pool_vars[0].write_fptr = lprocfs_wr_uint;
+ lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
+ }
+
+ snprintf(var_name, MAX_STRING_SIZE, "state");
+ pool_vars[0].data = pl;
+ pool_vars[0].read_fptr = lprocfs_rd_pool_state;
+ lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0);
+
+ pl->pl_stats = lprocfs_alloc_stats(LDLM_POOL_LAST_STAT -
+ LDLM_POOL_GRANTED_STAT);
+ if (!pl->pl_stats)
+ GOTO(out_free_name, rc = -ENOMEM);
+
+ lprocfs_counter_init(pl->pl_stats, LDLM_POOL_GRANTED_STAT,
+ LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
+ "granted", "locks");
+ lprocfs_counter_init(pl->pl_stats, LDLM_POOL_GRANT_RATE_STAT,
+ LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
+ "grant_rate", "locks/s");
+ lprocfs_counter_init(pl->pl_stats, LDLM_POOL_CANCEL_RATE_STAT,
+ LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
+ "cancel_rate", "locks/s");
+ lprocfs_counter_init(pl->pl_stats, LDLM_POOL_GRANT_PLAN_STAT,
+ LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
+ "grant_plan", "locks/s");
+ lprocfs_counter_init(pl->pl_stats, LDLM_POOL_SLV_STAT,
+ LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV,
+ "slv", "slv");
+ lprocfs_register_stats(pl->pl_proc_dir, "stats", pl->pl_stats);
+
+ EXIT;
+out_free_name:
+ OBD_FREE(var_name, MAX_STRING_SIZE + 1);
+ return rc;
+}
+
+static void ldlm_pool_proc_fini(struct ldlm_pool *pl)
+{
+ if (pl->pl_stats != NULL) {
+ lprocfs_free_stats(&pl->pl_stats);
+ pl->pl_stats = NULL;
+ }
+ if (pl->pl_proc_dir != NULL) {
+ lprocfs_remove(&pl->pl_proc_dir);
+ pl->pl_proc_dir = NULL;
+ }
+}
+#else /* !__KERNEL__*/
+#define ldlm_pool_proc_init(pl) (0)
+#define ldlm_pool_proc_fini(pl) while (0) {}
+#endif
+
+int ldlm_pool_init(struct ldlm_pool *pl, struct ldlm_namespace *ns,
+ int idx, ldlm_side_t client)
+{
+ int rc;
+ ENTRY;
+
+ spin_lock_init(&pl->pl_lock);
+ atomic_set(&pl->pl_granted, 0);
+ pl->pl_update_time = cfs_time_current();
+ atomic_set(&pl->pl_lock_volume_factor, 1);
+
+ atomic_set(&pl->pl_grant_rate, 0);
+ atomic_set(&pl->pl_cancel_rate, 0);
+ atomic_set(&pl->pl_grant_speed, 0);
+ pl->pl_control = LDLM_POOL_CTL_FULL;
+ atomic_set(&pl->pl_grant_step, LDLM_POOL_GSP);
+ atomic_set(&pl->pl_grant_plan, LDLM_POOL_GP(LDLM_POOL_HOST_L));
+
+ snprintf(pl->pl_name, sizeof(pl->pl_name), "ldlm-pool-%s-%d",
+ ns->ns_name, idx);
+
+ if (client == LDLM_NAMESPACE_SERVER) {
+ pl->pl_recalc = ldlm_srv_pool_recalc;
+ pl->pl_shrink = ldlm_srv_pool_shrink;
+ ldlm_pool_set_limit(pl, LDLM_POOL_HOST_L);
+ ldlm_pool_set_slv(pl, ldlm_pool_slv_max(LDLM_POOL_HOST_L));
+ } else {
+ ldlm_pool_set_slv(pl, 1);
+ ldlm_pool_set_limit(pl, 1);
+ pl->pl_recalc = ldlm_cli_pool_recalc;
+ pl->pl_shrink = ldlm_cli_pool_shrink;
+ }
+
+ rc = ldlm_pool_proc_init(pl);
+ if (rc)
+ RETURN(rc);
+
+ CDEBUG(D_DLMTRACE, "Lock pool %s is initialized\n", pl->pl_name);
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(ldlm_pool_init);
+
+void ldlm_pool_fini(struct ldlm_pool *pl)
+{
+ ENTRY;
+ ldlm_pool_proc_fini(pl);
+ pl->pl_recalc = NULL;
+ pl->pl_shrink = NULL;
+ EXIT;
+}
+EXPORT_SYMBOL(ldlm_pool_fini);
+
+void ldlm_pool_add(struct ldlm_pool *pl, struct ldlm_lock *lock)
+{
+ ENTRY;
+ atomic_inc(&pl->pl_granted);
+ atomic_inc(&pl->pl_grant_rate);
+ atomic_inc(&pl->pl_grant_speed);
+ EXIT;
+}
+EXPORT_SYMBOL(ldlm_pool_add);
+
+void ldlm_pool_del(struct ldlm_pool *pl, struct ldlm_lock *lock)
+{
+ ENTRY;
+ LASSERT(atomic_read(&pl->pl_granted) > 0);
+ atomic_dec(&pl->pl_granted);
+ atomic_inc(&pl->pl_cancel_rate);
+ atomic_dec(&pl->pl_grant_speed);
+ EXIT;
+}
+EXPORT_SYMBOL(ldlm_pool_del);
+
+/* ->pl_lock should be taken. */
+__u64 ldlm_pool_get_slv(struct ldlm_pool *pl)
+{
+ return pl->pl_server_lock_volume;
+}
+EXPORT_SYMBOL(ldlm_pool_get_slv);
+
+/* ->pl_lock should be taken. */
+void ldlm_pool_set_slv(struct ldlm_pool *pl, __u64 slv)
+{
+ pl->pl_server_lock_volume = slv;
+}
+EXPORT_SYMBOL(ldlm_pool_set_slv);
+
+__u32 ldlm_pool_get_limit(struct ldlm_pool *pl)
+{
+ return atomic_read(&pl->pl_limit);
+}
+EXPORT_SYMBOL(ldlm_pool_get_limit);
+
+void ldlm_pool_set_limit(struct ldlm_pool *pl, __u32 limit)
+{
+ atomic_set(&pl->pl_limit, limit);
+}
+EXPORT_SYMBOL(ldlm_pool_set_limit);
+
+/* Server side is only enabled for kernel space for now. */
+#ifdef __KERNEL__
+static int ldlm_pool_granted(struct ldlm_pool *pl)
+{
+ return atomic_read(&pl->pl_granted);
+}
+
+static struct ptlrpc_thread *ldlm_pools_thread;
+static struct shrinker *ldlm_pools_shrinker;
+static struct completion ldlm_pools_comp;
+
+static int ldlm_pools_thread_main(void *arg)
+{
+ struct ptlrpc_thread *thread = (struct ptlrpc_thread *)arg;
+ char *t_name = "ldlm_poold";
+ ENTRY;
+
+ cfs_daemonize(t_name);
+ thread->t_flags = SVC_RUNNING;
+ cfs_waitq_signal(&thread->t_ctl_waitq);
+
+ CDEBUG(D_DLMTRACE, "%s: pool thread starting, process %d\n",
+ t_name, cfs_curproc_pid());
+
+ while (1) {
+ __u32 nr_l = 0, nr_p = 0, l;
+ struct ldlm_namespace *ns;
+ struct l_wait_info lwi;
+ int rc, equal = 0;
+
+ /* Check all namespaces. */
+ mutex_down(&ldlm_namespace_lock);
+ list_for_each_entry(ns, &ldlm_namespace_list, ns_list_chain) {
+ if (ns->ns_appetite != LDLM_NAMESPACE_MODEST)
+ continue;
+
+ if (ns->ns_client == LDLM_NAMESPACE_SERVER) {
+ l = ldlm_pool_granted(&ns->ns_pool);
+ if (l == 0)
+ l = 1;
+
+ /* Set the modest pools limit equal to
+ * their avg granted locks + 5%. */
+ l += dru(l * LDLM_POOLS_MODEST_MARGIN, 100);
+ ldlm_pool_setup(&ns->ns_pool, l);
+ nr_l += l;
+ nr_p++;
+ }
+
+ /* After setup is done - recalc the pool. */
+ rc = ldlm_pool_recalc(&ns->ns_pool);
+ if (rc)
+ CERROR("%s: pool recalculation error "
+ "%d\n", ns->ns_pool.pl_name, rc);
+ }
+
+ if (nr_l >= 2 * (LDLM_POOL_HOST_L / 3)) {
+ CWARN("Modest pools eat out 2/3 of locks limit. %d of %lu. "
+ "Upgrade server!\n", nr_l, LDLM_POOL_HOST_L);
+ equal = 1;
+ }
+
+ list_for_each_entry(ns, &ldlm_namespace_list, ns_list_chain) {
+ if (!equal && ns->ns_appetite != LDLM_NAMESPACE_GREEDY)
+ continue;
+
+ if (ns->ns_client == LDLM_NAMESPACE_SERVER) {
+ if (equal) {
+ /* In the case 2/3 locks are eaten out by
+ * modest pools, we re-setup equal limit
+ * for _all_ pools. */
+ l = LDLM_POOL_HOST_L /
+ atomic_read(&ldlm_srv_namespace_nr);
+ } else {
+ /* All the rest of greedy pools will have
+ * all locks in equal parts.*/
+ l = (LDLM_POOL_HOST_L - nr_l) /
+ (atomic_read(&ldlm_srv_namespace_nr) -
+ nr_p);
+ }
+ ldlm_pool_setup(&ns->ns_pool, l);
+ }
+
+ /* After setup is done - recalc the pool. */
+ rc = ldlm_pool_recalc(&ns->ns_pool);
+ if (rc)
+ CERROR("%s: pool recalculation error "
+ "%d\n", ns->ns_pool.pl_name, rc);
+ }
+ mutex_up(&ldlm_namespace_lock);
+
+ /* Wait until the next check time, or until we're
+ * stopped. */
+ lwi = LWI_TIMEOUT(cfs_time_seconds(LDLM_POOLS_THREAD_PERIOD),
+ NULL, NULL);
+ l_wait_event(thread->t_ctl_waitq, (thread->t_flags &
+ (SVC_STOPPING|SVC_EVENT)),
+ &lwi);
+
+ if (thread->t_flags & SVC_STOPPING) {
+ thread->t_flags &= ~SVC_STOPPING;
+ break;
+ } else if (thread->t_flags & SVC_EVENT) {
+ thread->t_flags &= ~SVC_EVENT;
+ }
+ }
+
+ thread->t_flags = SVC_STOPPED;
+ cfs_waitq_signal(&thread->t_ctl_waitq);
+
+ CDEBUG(D_DLMTRACE, "%s: pool thread exiting, process %d\n",
+ t_name, cfs_curproc_pid());
+
+ complete_and_exit(&ldlm_pools_comp, 0);
+}
+
+static int ldlm_pools_thread_start(ldlm_side_t client)
+{
+ struct l_wait_info lwi = { 0 };
+ int rc;
+ ENTRY;
+
+ if (ldlm_pools_thread != NULL)
+ RETURN(-EALREADY);
+
+ OBD_ALLOC_PTR(ldlm_pools_thread);
+ if (ldlm_pools_thread == NULL)
+ RETURN(-ENOMEM);
+
+ ldlm_pools_thread->t_id = client;
+ init_completion(&ldlm_pools_comp);
+ cfs_waitq_init(&ldlm_pools_thread->t_ctl_waitq);
+
+ /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we
+ * just drop the VM and FILES in ptlrpc_daemonize() right away. */
+ rc = cfs_kernel_thread(ldlm_pools_thread_main, ldlm_pools_thread,
+ CLONE_VM | CLONE_FILES);
+ if (rc < 0) {
+ CERROR("Can't start pool thread, error %d\n",
+ rc);
+ OBD_FREE(ldlm_pools_thread, sizeof(*ldlm_pools_thread));
+ ldlm_pools_thread = NULL;
+ RETURN(rc);
+ }
+ l_wait_event(ldlm_pools_thread->t_ctl_waitq,
+ (ldlm_pools_thread->t_flags & SVC_RUNNING), &lwi);
+ RETURN(0);
+}
+
+static void ldlm_pools_thread_stop(void)
+{
+ ENTRY;
+
+ if (ldlm_pools_thread == NULL) {
+ EXIT;
+ return;
+ }
+
+ ldlm_pools_thread->t_flags = SVC_STOPPING;
+ cfs_waitq_signal(&ldlm_pools_thread->t_ctl_waitq);
+
+ /* Make sure that pools thread is finished before freeing @thread.
+ * This fixes possible race and oops due to accessing freed memory
+ * in pools thread. */
+ wait_for_completion(&ldlm_pools_comp);
+ OBD_FREE_PTR(ldlm_pools_thread);
+ ldlm_pools_thread = NULL;
+ EXIT;
+}
+
+int ldlm_pools_init(ldlm_side_t client)
+{
+ int rc;
+ ENTRY;
+
+ rc = ldlm_pools_thread_start(client);
+ if (rc == 0)
+ ldlm_pools_shrinker = set_shrinker(DEFAULT_SEEKS,
+ ldlm_pools_shrink);
+ RETURN(rc);
+}
+EXPORT_SYMBOL(ldlm_pools_init);
+
+void ldlm_pools_fini(void)
+{
+ if (ldlm_pools_shrinker != NULL) {
+ remove_shrinker(ldlm_pools_shrinker);
+ ldlm_pools_shrinker = NULL;
+ }
+ ldlm_pools_thread_stop();
+}
+EXPORT_SYMBOL(ldlm_pools_fini);
+
+void ldlm_pools_wakeup(void)
+{
+ ENTRY;
+ if (ldlm_pools_thread == NULL)
+ return;
+ ldlm_pools_thread->t_flags |= SVC_EVENT;
+ cfs_waitq_signal(&ldlm_pools_thread->t_ctl_waitq);
+ EXIT;
+}
+EXPORT_SYMBOL(ldlm_pools_wakeup);
+
+/* Cancel @nr locks from all namespaces (if possible). Returns number of
+ * cached locks after shrink is finished. All namespaces are asked to
+ * cancel approximately equal amount of locks. */
+int ldlm_pools_shrink(int nr, unsigned int gfp_mask)
+{
+ struct ldlm_namespace *ns;
+ int total = 0, cached = 0;
+
+ if (nr != 0 && !(gfp_mask & __GFP_FS))
+ return -1;
+
+ CDEBUG(D_DLMTRACE, "request to shrink %d locks from all pools\n",
+ nr);
+ mutex_down(&ldlm_namespace_lock);
+ list_for_each_entry(ns, &ldlm_namespace_list, ns_list_chain)
+ total += ldlm_pool_granted(&ns->ns_pool);
+
+ if (nr == 0) {
+ mutex_up(&ldlm_namespace_lock);
+ return total;
+ }
+
+ /* Check all namespaces. */
+ list_for_each_entry(ns, &ldlm_namespace_list, ns_list_chain) {
+ struct ldlm_pool *pl = &ns->ns_pool;
+ int cancel, nr_locks;
+
+ nr_locks = ldlm_pool_granted(&ns->ns_pool);
+ cancel = 1 + nr_locks * nr / total;
+ cancel = ldlm_pool_shrink(pl, cancel, gfp_mask);
+ cached += ldlm_pool_granted(&ns->ns_pool);
+ }
+ mutex_up(&ldlm_namespace_lock);
+ return cached;
+}
+EXPORT_SYMBOL(ldlm_pools_shrink);
+#endif /* __KERNEL__ */
+
+#else /* !HAVE_LRU_RESIZE_SUPPORT */
+int ldlm_pool_setup(struct ldlm_pool *pl, __u32 limit)
+{
+ return 0;
+}
+EXPORT_SYMBOL(ldlm_pool_setup);
+
+int ldlm_pool_recalc(struct ldlm_pool *pl)
+{
+ return 0;
+}
+EXPORT_SYMBOL(ldlm_pool_recalc);
+
+int ldlm_pool_shrink(struct ldlm_pool *pl,
+ int nr, unsigned int gfp_mask)
+{
+ return 0;
+}
+EXPORT_SYMBOL(ldlm_pool_shrink);
+
+int ldlm_pool_init(struct ldlm_pool *pl, struct ldlm_namespace *ns,
+ int idx, ldlm_side_t client)
+{
+ return 0;
+}
+EXPORT_SYMBOL(ldlm_pool_init);
+
+void ldlm_pool_fini(struct ldlm_pool *pl)
+{
+ return;
+}
+EXPORT_SYMBOL(ldlm_pool_fini);
+
+void ldlm_pool_add(struct ldlm_pool *pl, struct ldlm_lock *lock)
+{
+ return;
+}
+EXPORT_SYMBOL(ldlm_pool_add);
+
+void ldlm_pool_del(struct ldlm_pool *pl, struct ldlm_lock *lock)
+{
+ return;
+}
+EXPORT_SYMBOL(ldlm_pool_del);
+
+__u64 ldlm_pool_get_slv(struct ldlm_pool *pl)
+{
+ return 1;
+}
+EXPORT_SYMBOL(ldlm_pool_get_slv);
+
+void ldlm_pool_set_slv(struct ldlm_pool *pl, __u64 slv)
+{
+ return;
+}
+EXPORT_SYMBOL(ldlm_pool_set_slv);
+
+__u32 ldlm_pool_get_limit(struct ldlm_pool *pl)
+{
+ return 0;
+}
+EXPORT_SYMBOL(ldlm_pool_get_limit);
+
+void ldlm_pool_set_limit(struct ldlm_pool *pl, __u32 limit)
+{
+ return;
+}
+EXPORT_SYMBOL(ldlm_pool_set_limit);
+
+int ldlm_pools_init(ldlm_side_t client)
+{
+ return 0;
+}
+EXPORT_SYMBOL(ldlm_pools_init);
+
+void ldlm_pools_fini(void)
+{
+ return;
+}
+EXPORT_SYMBOL(ldlm_pools_fini);
+
+void ldlm_pools_wakeup(void)
+{
+ return;
+}
+EXPORT_SYMBOL(ldlm_pools_wakeup);
+#endif /* HAVE_LRU_RESIZE_SUPPORT */
/* Estimate the amount of free space in the request. */
int avail = ldlm_req_handles_avail(exp, size, bufcount,
LDLM_ENQUEUE_CANCEL_OFF);
+
LASSERT(avail >= count);
/* Cancel lru locks here _only_ if the server supports
* EARLY_CANCEL. Otherwise we have to send extra CANCEL
* rpc right on enqueue, what will make it slower, vs.
* asynchronous rpc in blocking thread. */
- count += ldlm_cancel_lru_local(ns, cancels, 1, avail - count,
- LDLM_CANCEL_AGED);
+ count += ldlm_cancel_lru_local(ns, cancels,
+ exp_connect_lru_resize(exp) ? 0 : 1,
+ avail - count, LDLM_CANCEL_AGED);
size[DLM_LOCKREQ_OFF] =
ldlm_request_bufsize(count, LDLM_ENQUEUE);
}
return sent ? sent : rc;
}
+static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)
+{
+ LASSERT(imp != NULL);
+ return &imp->imp_obd->obd_namespace->ns_pool;
+}
+
+int ldlm_cli_update_pool(struct ptlrpc_request *req)
+{
+ struct ldlm_pool *pl;
+ ENTRY;
+
+ if (!imp_connect_lru_resize(req->rq_import))
+ RETURN(0);
+
+ pl = ldlm_imp2pl(req->rq_import);
+
+ spin_lock(&pl->pl_lock);
+#ifdef __KERNEL__
+ {
+ __u64 old_slv, fast_slv_change;
+
+ old_slv = ldlm_pool_get_slv(pl);
+ fast_slv_change = old_slv * LDLM_POOLS_FAST_SLV_CHANGE;
+ do_div(fast_slv_change, 100);
+#endif
+ pl->pl_update_time = cfs_time_current();
+ ldlm_pool_set_slv(pl, lustre_msg_get_slv(req->rq_repmsg));
+ ldlm_pool_set_limit(pl, lustre_msg_get_limit(req->rq_repmsg));
+#ifdef __KERNEL__
+ /* Wake up pools thread only if SLV has changed more than
+ * 5% since last update. In this case we want to react asap.
+ * Otherwise it is no sense to wake up pools as they are
+ * re-calculated every 1s anyways. */
+ if (old_slv > ldlm_pool_get_slv(pl) &&
+ old_slv - ldlm_pool_get_slv(pl) > fast_slv_change)
+ ldlm_pools_wakeup();
+ }
+#endif
+ spin_unlock(&pl->pl_lock);
+
+ RETURN(0);
+}
+EXPORT_SYMBOL(ldlm_cli_update_pool);
+
int ldlm_cli_cancel(struct lustre_handle *lockh)
{
struct ldlm_lock *lock;
- CFS_LIST_HEAD(head);
+ CFS_LIST_HEAD(cancels);
int rc = 0;
ENTRY;
if (rc < 0 || rc == LDLM_FL_LOCAL_ONLY)
GOTO(out, rc);
- list_add(&lock->l_bl_ast, &head);
- rc = ldlm_cli_cancel_req(lock->l_conn_export, &head, 1);
+ list_add(&lock->l_bl_ast, &cancels);
+ rc = ldlm_cli_cancel_req(lock->l_conn_export, &cancels, 1);
EXIT;
out:
LDLM_LOCK_PUT(lock);
int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
int count, int max, int flags)
{
+ int rc, added = 0, left, unused;
cfs_time_t cur = cfs_time_current();
struct ldlm_lock *lock, *next;
- int rc, added = 0, left;
ENTRY;
spin_lock(&ns->ns_unused_lock);
- count += ns->ns_nr_unused - ns->ns_max_unused;
+ unused = ns->ns_nr_unused;
+
+ if (!ns_connect_lru_resize(ns))
+ count += unused - ns->ns_max_unused;
+
while (!list_empty(&ns->ns_unused_list)) {
+ struct ldlm_pool *pl;
+ __u64 slv, lvf, lv;
+
if (max && added >= max)
break;
if (&lock->l_lru == &ns->ns_unused_list)
break;
- if ((added >= count) &&
- (!(flags & LDLM_CANCEL_AGED) ||
- cfs_time_before_64(cur, (__u64)ns->ns_max_age +
- lock->l_last_used)))
- break;
+ pl = &ns->ns_pool;
+
+ if (ns_connect_lru_resize(ns)) {
+ cfs_time_t la;
+ /* Do not pay attention to slv in case we are asked
+ * to cancel particular number of locks (via proc) or
+ * we already scheduled @added locks for canceling. */
+ if (count != 0 && added > count)
+ break;
+
+ /* Calculate lv for every lock. */
+ spin_lock(&pl->pl_lock);
+ slv = ldlm_pool_get_slv(pl);
+ lvf = atomic_read(&pl->pl_lock_volume_factor);
+ spin_unlock(&pl->pl_lock);
+
+ la = cfs_duration_sec(cfs_time_sub(cur,
+ lock->l_last_used));
+ if (la == 0)
+ la = 1;
+
+ /* Stop when slv is not yet come from server or lv is
+ * smaller than it is. */
+ lv = lvf * la * unused;
+ if (slv == 1 || lv < slv)
+ break;
+ } else {
+ if ((added >= count) &&
+ (!(flags & LDLM_CANCEL_AGED) ||
+ cfs_time_before_64(cur, (__u64)ns->ns_max_age +
+ lock->l_last_used)))
+ break;
+ }
+
LDLM_LOCK_GET(lock); /* dropped by bl thread */
spin_unlock(&ns->ns_unused_lock);
unlock_res_and_lock(lock);
spin_lock(&ns->ns_unused_lock);
added++;
+ unused--;
}
spin_unlock(&ns->ns_unused_lock);
LDLM_LOCK_PUT(lock);
added--;
}
-
}
RETURN(added);
}
* in a thread and this function will return after the thread has been
* asked to call the callback. when called with LDLM_SYNC the blocking
* callback will be performed in this function. */
-int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync)
+int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync)
{
CFS_LIST_HEAD(cancels);
int count, rc;
#ifndef __KERNEL__
sync = LDLM_SYNC; /* force to be sync in user space */
#endif
- count = ldlm_cancel_lru_local(ns, &cancels, 0, 0, 0);
+ count = ldlm_cancel_lru_local(ns, &cancels, nr, 0, 0);
if (sync == LDLM_ASYNC) {
struct ldlm_lock *lock, *next;
list_for_each_entry_safe(lock, next, &cancels, l_bl_ast) {
/* If some locks are left in the list in ASYNC mode, or
* this is SYNC mode, cancel the list. */
ldlm_cli_cancel_list(&cancels, count, NULL, DLM_LOCKREQ_OFF);
- RETURN(0);
+ RETURN(count);
}
/* Find and cancel locally unused locks found on resource, matched to the
CERROR("ldlm_cli_cancel_list: %d\n", res);
res = count;
}
-
+
count -= res;
ldlm_lock_list_put(cancels, l_bl_ast, res);
}
cfs_mem_cache_t *ldlm_resource_slab, *ldlm_lock_slab;
+atomic_t ldlm_srv_namespace_nr = ATOMIC_INIT(0);
+atomic_t ldlm_cli_namespace_nr = ATOMIC_INIT(0);
struct semaphore ldlm_namespace_lock;
struct list_head ldlm_namespace_list = CFS_LIST_HEAD_INIT(ldlm_namespace_list);
cfs_proc_dir_entry_t *ldlm_type_proc_dir = NULL;
lprocfs_remove(&ldlm_type_proc_dir);
}
-static int lprocfs_uint_rd(char *page, char **start, off_t off,
- int count, int *eof, void *data)
-{
- unsigned int *temp = (unsigned int *)data;
- return snprintf(page, count, "%u\n", *temp);
-}
-
-#define MAX_STRING_SIZE 128
-static int lprocfs_uint_wr(struct file *file, const char *buffer,
- unsigned long count, void *data)
-{
- unsigned *p = data;
- char dummy[MAX_STRING_SIZE + 1], *end;
- unsigned long tmp;
-
- dummy[MAX_STRING_SIZE] = '\0';
- if (copy_from_user(dummy, buffer, MAX_STRING_SIZE))
- return -EFAULT;
-
- tmp = simple_strtoul(dummy, &end, 0);
- if (dummy == end)
- return -EINVAL;
-
- *p = (unsigned int)tmp;
- return count;
-}
-
-static int lprocfs_read_lru_size(char *page, char **start, off_t off,
- int count, int *eof, void *data)
+static int lprocfs_rd_lru_size(char *page, char **start, off_t off,
+ int count, int *eof, void *data)
{
struct ldlm_namespace *ns = data;
- return lprocfs_uint_rd(page, start, off, count, eof,
- &ns->ns_max_unused);
+ __u32 *nr = &ns->ns_max_unused;
+
+ if (ns_connect_lru_resize(ns))
+ nr = &ns->ns_nr_unused;
+ return lprocfs_rd_uint(page, start, off, count, eof, nr);
}
-static int lprocfs_write_lru_size(struct file *file, const char *buffer,
- unsigned long count, void *data)
+static int lprocfs_wr_lru_size(struct file *file, const char *buffer,
+ unsigned long count, void *data)
{
struct ldlm_namespace *ns = data;
char dummy[MAX_STRING_SIZE + 1], *end;
CDEBUG(D_DLMTRACE,
"dropping all unused locks from namespace %s\n",
ns->ns_name);
- tmp = ns->ns_max_unused;
- ns->ns_max_unused = 0;
- ldlm_cancel_lru(ns, LDLM_SYNC);
- ns->ns_max_unused = tmp;
+ if (ns_connect_lru_resize(ns)) {
+ /* Try to cancel all @ns_nr_unused locks. */
+ ldlm_cancel_lru(ns, ns->ns_nr_unused, LDLM_SYNC);
+ } else {
+ tmp = ns->ns_max_unused;
+ ns->ns_max_unused = 0;
+ ldlm_cancel_lru(ns, 0, LDLM_SYNC);
+ ns->ns_max_unused = tmp;
+ }
return count;
}
tmp = simple_strtoul(dummy, &end, 0);
- if (tmp == 0 && *end) {
+ if (*end) {
CERROR("invalid value written\n");
return -EINVAL;
}
- CDEBUG(D_DLMTRACE, "changing namespace %s max_unused from %u to %u\n",
- ns->ns_name, ns->ns_max_unused, (unsigned int)tmp);
- ns->ns_max_unused = (unsigned int)tmp;
-
- ldlm_cancel_lru(ns, LDLM_ASYNC);
-
+ if (ns_connect_lru_resize(ns)) {
+ if (tmp > ns->ns_nr_unused)
+ tmp = ns->ns_nr_unused;
+ tmp = ns->ns_nr_unused - tmp;
+
+ CDEBUG(D_DLMTRACE, "changing namespace %s unused locks from %u to %u\n",
+ ns->ns_name, ns->ns_nr_unused, (unsigned int)tmp);
+ ldlm_cancel_lru(ns, (unsigned int)tmp, LDLM_ASYNC);
+ } else {
+ CDEBUG(D_DLMTRACE, "changing namespace %s max_unused from %u to %u\n",
+ ns->ns_name, ns->ns_max_unused, (unsigned int)tmp);
+ ns->ns_max_unused = (unsigned int)tmp;
+ ldlm_cancel_lru(ns, 0, LDLM_ASYNC);
+ }
return count;
}
snprintf(lock_name, MAX_STRING_SIZE, "%s/lock_unused_count",
ns->ns_name);
lock_vars[0].data = &ns->ns_nr_unused;
- lock_vars[0].read_fptr = lprocfs_uint_rd;
+ lock_vars[0].read_fptr = lprocfs_rd_uint;
lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
snprintf(lock_name, MAX_STRING_SIZE, "%s/lru_size",
ns->ns_name);
lock_vars[0].data = ns;
- lock_vars[0].read_fptr = lprocfs_read_lru_size;
- lock_vars[0].write_fptr = lprocfs_write_lru_size;
+ lock_vars[0].read_fptr = lprocfs_rd_lru_size;
+ lock_vars[0].write_fptr = lprocfs_wr_lru_size;
lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
snprintf(lock_name, MAX_STRING_SIZE, "%s/lru_max_age",
ns->ns_name);
lock_vars[0].data = &ns->ns_max_age;
- lock_vars[0].read_fptr = lprocfs_uint_rd;
- lock_vars[0].write_fptr = lprocfs_uint_wr;
+ lock_vars[0].read_fptr = lprocfs_rd_uint;
+ lock_vars[0].write_fptr = lprocfs_wr_uint;
lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
} else {
snprintf(lock_name, MAX_STRING_SIZE, "%s/max_nolock_bytes",
ns->ns_name);
lock_vars[0].data = &ns->ns_max_nolock_size;
- lock_vars[0].read_fptr = lprocfs_uint_rd;
- lock_vars[0].write_fptr = lprocfs_uint_wr;
+ lock_vars[0].read_fptr = lprocfs_rd_uint;
+ lock_vars[0].write_fptr = lprocfs_wr_uint;
lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
snprintf(lock_name, MAX_STRING_SIZE, "%s/contention_seconds",
ns->ns_name);
lock_vars[0].data = &ns->ns_contention_time;
- lock_vars[0].read_fptr = lprocfs_uint_rd;
- lock_vars[0].write_fptr = lprocfs_uint_wr;
+ lock_vars[0].read_fptr = lprocfs_rd_uint;
+ lock_vars[0].write_fptr = lprocfs_wr_uint;
lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
snprintf(lock_name, MAX_STRING_SIZE, "%s/contended_locks",
ns->ns_name);
lock_vars[0].data = &ns->ns_contended_locks;
- lock_vars[0].read_fptr = lprocfs_uint_rd;
- lock_vars[0].write_fptr = lprocfs_uint_wr;
+ lock_vars[0].read_fptr = lprocfs_rd_uint;
+ lock_vars[0].write_fptr = lprocfs_wr_uint;
lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
}
}
#define ldlm_proc_namespace(ns) do {} while (0)
#endif /* LPROCFS */
-struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client)
+static atomic_t *ldlm_namespace_nr(ldlm_side_t client)
+{
+ return client == LDLM_NAMESPACE_SERVER ?
+ &ldlm_srv_namespace_nr : &ldlm_cli_namespace_nr;
+}
+
+struct ldlm_namespace *ldlm_namespace_new(char *name, ldlm_side_t client,
+ ldlm_appetite_t apt)
{
struct ldlm_namespace *ns = NULL;
struct list_head *bucket;
- int rc;
+ int rc, idx;
ENTRY;
- rc = ldlm_get_ref();
+ rc = ldlm_get_ref(client);
if (rc) {
CERROR("ldlm_get_ref failed: %d\n", rc);
RETURN(NULL);
}
- OBD_ALLOC(ns, sizeof(*ns));
+ OBD_ALLOC_PTR(ns);
if (!ns)
GOTO(out_ref, NULL);
if (!ns->ns_name)
GOTO(out_hash, NULL);
+ ns->ns_appetite = apt;
strcpy(ns->ns_name, name);
CFS_INIT_LIST_HEAD(&ns->ns_root_list);
ns->ns_max_age = LDLM_DEFAULT_MAX_ALIVE;
spin_lock_init(&ns->ns_unused_lock);
+ ns->ns_connect_flags = 0;
mutex_down(&ldlm_namespace_lock);
list_add(&ns->ns_list_chain, &ldlm_namespace_list);
+ idx = atomic_read(ldlm_namespace_nr(client));
+ atomic_inc(ldlm_namespace_nr(client));
mutex_up(&ldlm_namespace_lock);
+
ldlm_proc_namespace(ns);
+
+ rc = ldlm_pool_init(&ns->ns_pool, ns, idx, client);
+ if (rc) {
+ CERROR("can't initialize lock pool, rc %d\n", rc);
+ GOTO(out_del, rc);
+ }
RETURN(ns);
+out_del:
+ mutex_down(&ldlm_namespace_lock);
+ list_del(&ns->ns_list_chain);
+ atomic_dec(ldlm_namespace_nr(client));
+ mutex_up(&ldlm_namespace_lock);
out_hash:
POISON(ns->ns_hash, 0x5a, sizeof(*ns->ns_hash) * RES_HASH_SIZE);
OBD_VFREE(ns->ns_hash, sizeof(*ns->ns_hash) * RES_HASH_SIZE);
out_ns:
- OBD_FREE(ns, sizeof(*ns));
+ OBD_FREE_PTR(ns);
out_ref:
- ldlm_put_ref(0);
+ ldlm_put_ref(client, 0);
RETURN(NULL);
}
mutex_down(&ldlm_namespace_lock);
list_del(&ns->ns_list_chain);
+ atomic_dec(ldlm_namespace_nr(ns->ns_client));
+ ldlm_pool_fini(&ns->ns_pool);
mutex_up(&ldlm_namespace_lock);
/* At shutdown time, don't call the cancellation callback */
int ldlm_namespace_free_post(struct ldlm_namespace *ns, int force)
{
+ ldlm_side_t client;
ENTRY;
if (!ns)
RETURN(ELDLM_OK);
}
}
#endif
-
+ client = ns->ns_client;
POISON(ns->ns_hash, 0x5a, sizeof(*ns->ns_hash) * RES_HASH_SIZE);
OBD_VFREE(ns->ns_hash, sizeof(*ns->ns_hash) * RES_HASH_SIZE);
OBD_FREE(ns->ns_name, strlen(ns->ns_name) + 1);
- OBD_FREE(ns, sizeof(*ns));
-
- ldlm_put_ref(force);
+ OBD_FREE_PTR(ns);
+ ldlm_put_ref(client, force);
RETURN(ELDLM_OK);
}
data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_IBITS |
OBD_CONNECT_JOIN | OBD_CONNECT_ATTRFID | OBD_CONNECT_NODEVOH |
OBD_CONNECT_CANCELSET | OBD_CONNECT_AT;
+#ifdef HAVE_LRU_RESIZE_SUPPORT
+ data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
+#endif
#ifdef CONFIG_FS_POSIX_ACL
data->ocd_connect_flags |= OBD_CONNECT_ACL;
#endif
OBD_CONNECT_REQPORTAL | OBD_CONNECT_BRW_SIZE |
OBD_CONNECT_SRVLOCK | OBD_CONNECT_CANCELSET | OBD_CONNECT_AT;
+#ifdef HAVE_LRU_RESIZE_SUPPORT
+ data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE;
+#endif
+
CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d "
"ocd_grant: %d\n", data->ocd_connect_flags,
data->ocd_version, data->ocd_grant);
mds->mds_evict_ost_nids = 1;
sprintf(ns_name, "mds-%s", obd->obd_uuid.uuid);
- obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
+ obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER,
+ LDLM_NAMESPACE_GREEDY);
if (obd->obd_namespace == NULL) {
mds_cleanup(obd);
GOTO(err_ops, rc = -ENOMEM);
GOTO(err_put, rc = PTR_ERR(obd->obd_fsops));
/* namespace for mgs llog */
- obd->obd_namespace = ldlm_namespace_new("MGS", LDLM_NAMESPACE_SERVER);
- if (obd->obd_namespace == NULL) {
- mgs_cleanup(obd);
+ obd->obd_namespace = ldlm_namespace_new("MGS", LDLM_NAMESPACE_SERVER,
+ LDLM_NAMESPACE_MODEST);
+ if (obd->obd_namespace == NULL)
GOTO(err_ops, rc = -ENOMEM);
- }
/* ldlm setup */
ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
struct mgs_obd *mgs = &obd->u.mgs;
ENTRY;
- ping_evictor_stop();
-
if (mgs->mgs_sb == NULL)
RETURN(0);
+ ping_evictor_stop();
+
ptlrpc_unregister_service(mgs->mgs_service);
mgs_cleanup_fsdb_list(obd);
#if defined(LPROCFS)
+#define MAX_STRING_SIZE 128
+
/* for bug 10866, global variable */
DECLARE_RWSEM(_lprocfs_lock);
EXPORT_SYMBOL(_lprocfs_lock);
}
/* Generic callbacks */
+int lprocfs_rd_uint(char *page, char **start, off_t off,
+ int count, int *eof, void *data)
+{
+ unsigned int *temp = (unsigned int *)data;
+ return snprintf(page, count, "%u\n", *temp);
+}
+
+int lprocfs_wr_uint(struct file *file, const char *buffer,
+ unsigned long count, void *data)
+{
+ unsigned *p = data;
+ char dummy[MAX_STRING_SIZE + 1], *end;
+ unsigned long tmp;
+
+ dummy[MAX_STRING_SIZE] = '\0';
+ if (copy_from_user(dummy, buffer, MAX_STRING_SIZE))
+ return -EFAULT;
+
+ tmp = simple_strtoul(dummy, &end, 0);
+ if (dummy == end)
+ return -EINVAL;
+
+ *p = (unsigned int)tmp;
+ return count;
+}
int lprocfs_rd_u64(char *page, char **start, off_t off,
int count, int *eof, void *data)
return snprintf(page, count, "%d\n", atomic_read(atom));
}
+int lprocfs_wr_atomic(struct file *file, const char *buffer,
+ unsigned long count, void *data)
+{
+ atomic_t *atm = data;
+ int val = 0;
+ int rc;
+
+ rc = lprocfs_write_helper(buffer, count, &val);
+ if (rc < 0)
+ return rc;
+
+ if (val <= 0)
+ return -ERANGE;
+
+ atomic_set(atm, val);
+ return count;
+}
+
int lprocfs_rd_uuid(char *page, char **start, off_t off, int count,
int *eof, void *data)
{
"early_lock_cancel",
"size_on_mds",
"adaptive_timeout",
+ "lru_resize",
NULL
};
EXPORT_SYMBOL(lprocfs_rd_u64);
EXPORT_SYMBOL(lprocfs_rd_atomic);
+EXPORT_SYMBOL(lprocfs_wr_atomic);
+EXPORT_SYMBOL(lprocfs_rd_uint);
+EXPORT_SYMBOL(lprocfs_wr_uint);
EXPORT_SYMBOL(lprocfs_rd_uuid);
EXPORT_SYMBOL(lprocfs_rd_name);
EXPORT_SYMBOL(lprocfs_rd_fstype);
obd->u.echo.eo_lastino = ECHO_INIT_OBJID;
obd->obd_namespace = ldlm_namespace_new("echo-tgt",
- LDLM_NAMESPACE_SERVER);
+ LDLM_NAMESPACE_SERVER,
+ LDLM_NAMESPACE_GREEDY);
if (obd->obd_namespace == NULL) {
LBUG();
RETURN(-ENOMEM);
filter->fo_fmd_max_age = FILTER_FMD_MAX_AGE_DEFAULT;
sprintf(ns_name, "filter-%s", obd->obd_uuid.uuid);
- obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
+ obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER,
+ LDLM_NAMESPACE_GREEDY);
if (obd->obd_namespace == NULL)
GOTO(err_post, rc = -ENOMEM);
obd->obd_namespace->ns_lvbp = obd;
ldlm_objs += $(LDLM)ldlm_plain.o $(LDLM)ldlm_extent.o
ldlm_objs += $(LDLM)ldlm_request.o $(LDLM)ldlm_lockd.o
ldlm_objs += $(LDLM)ldlm_flock.o $(LDLM)ldlm_inodebits.o
+ldlm_objs += $(LDLM)ldlm_pool.o
ptlrpc_objs := client.o recover.o connection.o niobuf.o pack_generic.o
ptlrpc_objs += events.o ptlrpc_module.o service.o pinger.o recov_thread.o
ptlrpc_objs += llog_net.o llog_client.o llog_server.o import.o ptlrpcd.o
$(top_srcdir)/lustre/ldlm/ldlm_lockd.c \
$(top_srcdir)/lustre/ldlm/ldlm_internal.h \
$(top_srcdir)/lustre/ldlm/ldlm_inodebits.c \
- $(top_srcdir)/lustre/ldlm/ldlm_flock.c
+ $(top_srcdir)/lustre/ldlm/ldlm_flock.c \
+ $(top_srcdir)/lustre/ldlm/ldlm_pool.c
COMMON_SOURCES = client.c recover.c connection.c niobuf.c pack_generic.c \
events.c ptlrpc_module.c service.c pinger.c recov_thread.c llog_net.c \
}
rc = ptlrpc_check_status(req);
-
- /* Either we've been evicted, or the server has failed for
- * some reason. Try to reconnect, and if that fails, punt to the
- * upcall. */
- if ((rc == -ENOTCONN) || (rc == -ENODEV)) {
- if (req->rq_send_state != LUSTRE_IMP_FULL ||
- imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
- RETURN(-ENOTCONN);
+ if (rc) {
+ /* Either we've been evicted, or the server has failed for
+ * some reason. Try to reconnect, and if that fails, punt to
+ * the upcall. */
+ if (rc == -ENOTCONN || rc == -ENODEV) {
+ if (req->rq_send_state != LUSTRE_IMP_FULL ||
+ imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) {
+ RETURN(-ENOTCONN);
+ }
+ ptlrpc_request_handle_notconn(req);
+ RETURN(rc);
+ }
+ } else {
+ /* Let's look if server send slv. Do it only for RPC with
+ * rc == 0. */
+ if (imp->imp_obd->obd_namespace) {
+ /* Disconnect rpc is sent when namespace is already
+ * destroyed. Let's check this and will not try update
+ * pool. */
+ ldlm_cli_update_pool(req);
}
-
- ptlrpc_request_handle_notconn(req);
-
- RETURN(rc);
}
/* Store transno in reqmsg for replay. */
ocd = lustre_swab_repbuf(request, REPLY_REC_OFF, sizeof(*ocd),
lustre_swab_connect);
-
spin_lock(&imp->imp_lock);
list_del(&imp->imp_conn_current->oic_item);
list_add(&imp->imp_conn_current->oic_item, &imp->imp_conn_list);
GOTO(out, rc = -ENODEV);
}
exp->exp_connect_flags = ocd->ocd_connect_flags;
+ imp->imp_obd->obd_self_export->exp_connect_flags = ocd->ocd_connect_flags;
class_export_put(exp);
obd_import_event(imp->imp_obd, imp, IMP_EVENT_OCD);
ocd->ocd_brw_size >> CFS_PAGE_SHIFT;
}
+ imp->imp_obd->obd_namespace->ns_connect_flags = ocd->ocd_connect_flags;
+
if ((ocd->ocd_connect_flags & OBD_CONNECT_AT) &&
(imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V2))
/* We need a per-message support flag, because
}
}
+__u64 lustre_msg_get_slv(struct lustre_msg *msg)
+{
+ switch (msg->lm_magic) {
+ case LUSTRE_MSG_MAGIC_V1:
+ case LUSTRE_MSG_MAGIC_V1_SWABBED:
+ return 1;
+ case LUSTRE_MSG_MAGIC_V2:
+ case LUSTRE_MSG_MAGIC_V2_SWABBED: {
+ struct ptlrpc_body *pb;
+
+ pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ if (!pb) {
+ CERROR("invalid msg %p: no ptlrpc body!\n", msg);
+ return -EINVAL;
+ }
+ return pb->pb_slv;
+ }
+ default:
+ CERROR("invalid msg magic %x\n", msg->lm_magic);
+ return -EINVAL;
+ }
+}
+
+
+void lustre_msg_set_slv(struct lustre_msg *msg, __u64 slv)
+{
+ switch (msg->lm_magic) {
+ case LUSTRE_MSG_MAGIC_V1:
+ case LUSTRE_MSG_MAGIC_V1_SWABBED:
+ return;
+ case LUSTRE_MSG_MAGIC_V2:
+ case LUSTRE_MSG_MAGIC_V2_SWABBED: {
+ struct ptlrpc_body *pb;
+
+ pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ if (!pb) {
+ CERROR("invalid msg %p: no ptlrpc body!\n", msg);
+ return;
+ }
+ pb->pb_slv = slv;
+ return;
+ }
+ default:
+ CERROR("invalid msg magic %x\n", msg->lm_magic);
+ return;
+ }
+}
+
+__u32 lustre_msg_get_limit(struct lustre_msg *msg)
+{
+ switch (msg->lm_magic) {
+ case LUSTRE_MSG_MAGIC_V1:
+ case LUSTRE_MSG_MAGIC_V1_SWABBED:
+ return 1;
+ case LUSTRE_MSG_MAGIC_V2:
+ case LUSTRE_MSG_MAGIC_V2_SWABBED: {
+ struct ptlrpc_body *pb;
+
+ pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ if (!pb) {
+ CERROR("invalid msg %p: no ptlrpc body!\n", msg);
+ return -EINVAL;
+ }
+ return pb->pb_limit;
+ }
+ default:
+ CERROR("invalid msg magic %x\n", msg->lm_magic);
+ return -EINVAL;
+ }
+}
+
+
+void lustre_msg_set_limit(struct lustre_msg *msg, __u64 limit)
+{
+ switch (msg->lm_magic) {
+ case LUSTRE_MSG_MAGIC_V1:
+ case LUSTRE_MSG_MAGIC_V1_SWABBED:
+ return;
+ case LUSTRE_MSG_MAGIC_V2:
+ case LUSTRE_MSG_MAGIC_V2_SWABBED: {
+ struct ptlrpc_body *pb;
+
+ pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb));
+ if (!pb) {
+ CERROR("invalid msg %p: no ptlrpc body!\n", msg);
+ return;
+ }
+ pb->pb_limit = limit;
+ return;
+ }
+ default:
+ CERROR("invalid msg magic %x\n", msg->lm_magic);
+ return;
+ }
+}
+
__u32 lustre_msg_get_conn_cnt(struct lustre_msg *msg)
{
switch (msg->lm_magic) {
__swab32s (&b->pb_conn_cnt);
__swab32s (&b->pb_timeout);
__swab32s (&b->pb_service_time);
- CLASSERT(offsetof(typeof(*b), pb_padding_1) != 0);
- CLASSERT(offsetof(typeof(*b), pb_padding_2) != 0);
- CLASSERT(offsetof(typeof(*b), pb_padding_3) != 0);
+ __swab64s (&b->pb_slv);
+ __swab32s (&b->pb_limit);
}
void lustre_swab_connect(struct obd_connect_data *ocd)
int rc = 0;
ENTRY;
- req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING, 1, NULL, NULL);
+ req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING,
+ 1, NULL, NULL);
if (req) {
DEBUG_REQ(D_INFO, req, "pinging %s->%s",
imp->imp_obd->obd_uuid.uuid,
EXPORT_SYMBOL(lustre_msg_get_last_committed);
EXPORT_SYMBOL(lustre_msg_get_transno);
EXPORT_SYMBOL(lustre_msg_get_status);
+EXPORT_SYMBOL(lustre_msg_get_slv);
+EXPORT_SYMBOL(lustre_msg_get_limit);
+EXPORT_SYMBOL(lustre_msg_set_slv);
+EXPORT_SYMBOL(lustre_msg_set_limit);
EXPORT_SYMBOL(lustre_msg_get_conn_cnt);
EXPORT_SYMBOL(lustre_msg_is_v1);
EXPORT_SYMBOL(lustre_msg_get_magic);
{
/* Wire protocol assertions generated by 'wirecheck'
* (make -C lustre/utils newwirecheck)
- * running on Linux pancake 2.6.18-skas3-v9-pre9 #1 Tue Feb 20 10:37:58 PST 2007 i686 i686 i3
- * with gcc version 3.4.4 */
+ * running on Linux hideous 2.6.9-prep.qp3.5.34.4qsnet #3 Mon Aug 13 08:38:55 EEST 2007 i686
+ * with gcc version 3.4.6 20060404 (Red Hat 3.4.6-3) */
/* Constants... */
(long long)(int)offsetof(struct ptlrpc_body, pb_service_time));
LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_service_time) == 4, " found %lld\n",
(long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_service_time));
- LASSERTF((int)offsetof(struct ptlrpc_body, pb_padding_1) == 76, " found %lld\n",
- (long long)(int)offsetof(struct ptlrpc_body, pb_padding_1));
- LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_padding_1) == 4, " found %lld\n",
- (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_padding_1));
- LASSERTF((int)offsetof(struct ptlrpc_body, pb_padding_2) == 80, " found %lld\n",
- (long long)(int)offsetof(struct ptlrpc_body, pb_padding_2));
- LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_padding_2) == 4, " found %lld\n",
- (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_padding_2));
- LASSERTF((int)offsetof(struct ptlrpc_body, pb_padding_3) == 84, " found %lld\n",
- (long long)(int)offsetof(struct ptlrpc_body, pb_padding_3));
- LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_padding_3) == 4, " found %lld\n",
- (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_padding_3));
+ LASSERTF((int)offsetof(struct ptlrpc_body, pb_slv) == 80, " found %lld\n",
+ (long long)(int)offsetof(struct ptlrpc_body, pb_slv));
+ LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_slv) == 8, " found %lld\n",
+ (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_slv));
+ LASSERTF((int)offsetof(struct ptlrpc_body, pb_limit) == 76, " found %lld\n",
+ (long long)(int)offsetof(struct ptlrpc_body, pb_limit));
+ LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_limit) == 4, " found %lld\n",
+ (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_limit));
/* Checks for struct obd_connect_data */
LASSERTF((int)sizeof(struct obd_connect_data) == 72, " found %lld\n",
CLASSERT(OBD_CONNECT_CANCELSET == 0x400000ULL);
CLASSERT(OBD_CONNECT_SOM == 0x00800000ULL);
CLASSERT(OBD_CONNECT_AT == 0x01000000ULL);
+ CLASSERT(OBD_CONNECT_LRU_RESIZE == 0x02000000ULL);
/* Checks for struct obdo */
LASSERTF((int)sizeof(struct obdo) == 208, " found %lld\n",
LASSERTF((int)sizeof(((struct lustre_disk_data *)0)->ldd_params) == 4096, " found %lld\n",
(long long)(int)sizeof(((struct lustre_disk_data *)0)->ldd_params));
}
-
}
run_test 119b "Sparse directIO read must return actual read amount"
+LDLM_POOL_CTL_RECALC=1
+LDLM_POOL_CTL_SHRINK=2
+
+disable_pool_recalc() {
+ NSDIR=`find $LPROC/ldlm/namespaces | grep $1 | head -1`
+ if test -f $NSDIR/pool/control; then
+ NS=`basename $NSDIR`
+ echo "disable pool recalc for $NS pool"
+ CONTROL=`cat $NSDIR/pool/control`
+ CONTROL=$((CONTROL & ~LDLM_POOL_CTL_RECALC))
+ echo "$CONTROL" > $NSDIR/pool/control
+ fi
+}
+
+enable_pool_recalc() {
+ NSDIR=`find $LPROC/ldlm/namespaces | grep $1 | head -1`
+ if test -f $NSDIR/pool/control; then
+ NS=`basename $NSDIR`
+ echo "enable pool recalc $NS pool"
+ CONTROL=`cat $NSDIR/pool/control`
+ CONTROL=$((CONTROL | LDLM_POOL_CTL_RECALC))
+ echo "$CONTROL" > $NSDIR/pool/control
+ fi
+}
+
+disable_pool_shrink() {
+ NSDIR=`find $LPROC/ldlm/namespaces | grep $1 | head -1`
+ if test -f $NSDIR/pool/control; then
+ NS=`basename $NSDIR`
+ echo "disable pool shrink for $NS pool"
+ CONTROL=`cat $NSDIR/pool/control`
+ CONTROL=$((CONTROL & ~LDLM_POOL_CTL_SHRINK))
+ echo "$CONTROL" > $NSDIR/pool/control
+ fi
+}
+
+enable_pool_shrink() {
+ NSDIR=`find $LPROC/ldlm/namespaces | grep $1 | head -1`
+ if test -f $NSDIR/pool/control; then
+ NS=`basename $NSDIR`
+ echo "enable pool shrink for $NS pool"
+ CONTROL=`cat $NSDIR/pool/control`
+ CONTROL=$((CONTROL | LDLM_POOL_CTL_SHRINK))
+ echo "$CONTROL" > $NSDIR/pool/control
+ fi
+}
+
test_120a() {
+ disable_pool_recalc mdc
+ disable_pool_shrink mdc
+ disable_pool_shrink "mds-$FSNAME"
mkdir $DIR/$tdir
cancel_lru_locks mdc
stat $DIR/$tdir > /dev/null
run_test 120a "Early Lock Cancel: mkdir test ==================="
test_120b() {
+ disable_pool_recalc mdc
+ disable_pool_shrink mdc
+ disable_pool_shrink mds-lustre
mkdir $DIR/$tdir
cancel_lru_locks mdc
stat $DIR/$tdir > /dev/null
run_test 120b "Early Lock Cancel: create test =================="
test_120c() {
+ disable_pool_recalc mdc
+ disable_pool_shrink mdc
+ disable_pool_shrink "mds-$FSNAME"
mkdir -p $DIR/$tdir/d1 $DIR/$tdir/d2
touch $DIR/$tdir/d1/f1
cancel_lru_locks mdc
run_test 120c "Early Lock Cancel: link test ===================="
test_120d() {
+ disable_pool_recalc mdc
+ disable_pool_shrink mdc
+ disable_pool_shrink "mds-$FSNAME"
touch $DIR/$tdir
cancel_lru_locks mdc
stat $DIR/$tdir > /dev/null
run_test 120d "Early Lock Cancel: setattr test ================="
test_120e() {
+ disable_pool_recalc mdc
+ disable_pool_shrink mdc
+ disable_pool_shrink "mds-$FSNAME"
mkdir $DIR/$tdir
dd if=/dev/zero of=$DIR/$tdir/f1 count=1
cancel_lru_locks mdc
run_test 120e "Early Lock Cancel: unlink test =================="
test_120f() {
+ disable_pool_recalc mdc
+ disable_pool_shrink mdc
+ disable_pool_shrink "mds-$FSNAME"
mkdir -p $DIR/$tdir/d1 $DIR/$tdir/d2
dd if=/dev/zero of=$DIR/$tdir/d1/f1 count=1
dd if=/dev/zero of=$DIR/$tdir/d2/f2 count=1
run_test 120f "Early Lock Cancel: rename test =================="
test_120g() {
+ disable_pool_recalc mdc
+ disable_pool_shrink mdc
+ disable_pool_shrink "mds-$FSNAME"
count=10000
echo create $count files
mkdir $DIR/$tdir
}
run_test 123 "verify statahead work"
+test_124() {
+ NSDIR=`find $LPROC/ldlm/namespaces | grep mdc | head -1`
+
+ if ! test -f $NSDIR/pool/stats; then
+ skip "lru resize is not enabled!"
+ return
+ fi
+
+ enable_pool_recalc mdc
+ disable_pool_shrink "mds-$FSNAME"
+ disable_pool_shrink mdc
+
+ LIMIT=`cat $NSDIR/pool/limit`
+ LIMIT=$(($LIMIT+$LIMIT*5/100))
+ mkdir $DIR/$tdir
+ log "create $LIMIT files at $DIR/$tdir"
+ createmany -o $DIR/$tdir/f $LIMIT
+ ls -la $DIR/$tdir
+
+ LRU_SIZE_B=`cat $NSDIR/lru_size`
+ log "created $LRU_SIZE_B locks"
+
+ # locks should live 10h on clients at max. Thus, to make them expire in 2 min
+ # we made lock_volume_factor = (10h * 60m) / 2m == 300, so that, to have all
+ # locks expired in 2 min we need to speed things up by factor 300
+ log "make client drop locks 300 times faster so that 2m wait is enough"
+ echo "300" > $NSDIR/pool/lock_volume_factor
+ log "sleep for 2m"
+ sleep 2m
+ LRU_SIZE_A=`cat $NSDIR/lru_size`
+ echo "1" > $NSDIR/pool/lock_volume_factor
+
+ [ $LRU_SIZE_B -gt $LRU_SIZE_A ] || {
+ error "No locks dropped in 2m. LRU size: $LRU_SIZE_A"
+ enable_pool_shrink mdc
+ return
+ }
+
+ log "Dropped "$((LRU_SIZE_B-LRU_SIZE_A))" locks in 2m"
+ enable_pool_shrink mdc
+ log "unlink $LIMIT files at $DIR/$tdir"
+ unlinkmany $DIR/$tdir/f $LIMIT > /dev/null 2>&1
+}
+run_test 124 "lru resize ======================================="
+
TMPDIR=$OLDTMPDIR
TMP=$OLDTMP
HOME=$OLDHOME
*
*/
-
-
#include <stdlib.h>
#include <stdio.h>
#include <lnet/lnetctl.h>
CHECK_MEMBER(ptlrpc_body, pb_conn_cnt);
CHECK_MEMBER(ptlrpc_body, pb_timeout);
CHECK_MEMBER(ptlrpc_body, pb_service_time);
- CHECK_MEMBER(ptlrpc_body, pb_padding_1);
- CHECK_MEMBER(ptlrpc_body, pb_padding_2);
- CHECK_MEMBER(ptlrpc_body, pb_padding_3);
+ CHECK_MEMBER(ptlrpc_body, pb_slv);
+ CHECK_MEMBER(ptlrpc_body, pb_limit);
}
static void check_obd_connect_data(void)
CHECK_CDEFINE(OBD_CONNECT_CANCELSET);
CHECK_CDEFINE(OBD_CONNECT_SOM);
CHECK_CDEFINE(OBD_CONNECT_AT);
+ CHECK_CDEFINE(OBD_CONNECT_LRU_RESIZE);
}
static void
{
/* Wire protocol assertions generated by 'wirecheck'
* (make -C lustre/utils newwirecheck)
- * running on Linux pancake 2.6.18-skas3-v9-pre9 #1 Tue Feb 20 10:37:58 PST 2007 i686 i686 i3
- * with gcc version 3.4.4 */
+ * running on Linux hideous 2.6.9-prep.qp3.5.34.4qsnet #3 Mon Aug 13 08:38:55 EEST 2007 i686
+ * with gcc version 3.4.6 20060404 (Red Hat 3.4.6-3) */
/* Constants... */
(long long)(int)offsetof(struct ptlrpc_body, pb_service_time));
LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_service_time) == 4, " found %lld\n",
(long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_service_time));
- LASSERTF((int)offsetof(struct ptlrpc_body, pb_padding_1) == 76, " found %lld\n",
- (long long)(int)offsetof(struct ptlrpc_body, pb_padding_1));
- LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_padding_1) == 4, " found %lld\n",
- (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_padding_1));
- LASSERTF((int)offsetof(struct ptlrpc_body, pb_padding_2) == 80, " found %lld\n",
- (long long)(int)offsetof(struct ptlrpc_body, pb_padding_2));
- LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_padding_2) == 4, " found %lld\n",
- (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_padding_2));
- LASSERTF((int)offsetof(struct ptlrpc_body, pb_padding_3) == 84, " found %lld\n",
- (long long)(int)offsetof(struct ptlrpc_body, pb_padding_3));
- LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_padding_3) == 4, " found %lld\n",
- (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_padding_3));
+ LASSERTF((int)offsetof(struct ptlrpc_body, pb_slv) == 80, " found %lld\n",
+ (long long)(int)offsetof(struct ptlrpc_body, pb_slv));
+ LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_slv) == 8, " found %lld\n",
+ (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_slv));
+ LASSERTF((int)offsetof(struct ptlrpc_body, pb_limit) == 76, " found %lld\n",
+ (long long)(int)offsetof(struct ptlrpc_body, pb_limit));
+ LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_limit) == 4, " found %lld\n",
+ (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_limit));
/* Checks for struct obd_connect_data */
LASSERTF((int)sizeof(struct obd_connect_data) == 72, " found %lld\n",
CLASSERT(OBD_CONNECT_CANCELSET == 0x400000ULL);
CLASSERT(OBD_CONNECT_SOM == 0x00800000ULL);
CLASSERT(OBD_CONNECT_AT == 0x01000000ULL);
+ CLASSERT(OBD_CONNECT_LRU_RESIZE == 0x02000000ULL);
/* Checks for struct obdo */
LASSERTF((int)sizeof(struct obdo) == 208, " found %lld\n",
LASSERTF((int)sizeof(((struct lustre_disk_data *)0)->ldd_params) == 4096, " found %lld\n",
(long long)(int)sizeof(((struct lustre_disk_data *)0)->ldd_params));
}
-