From 7a75fa457395445e2f54e2d5ebb47bd659d5778f Mon Sep 17 00:00:00 2001 From: yury Date: Fri, 7 Sep 2007 09:32:25 +0000 Subject: [PATCH] b=2262 r=adilger,vitaly,nikita - landed lru resize. --- lustre/ChangeLog | 8 + lustre/autoconf/lustre-core.m4 | 13 + lustre/include/lprocfs_status.h | 8 +- lustre/include/lustre/lustre_idl.h | 15 +- lustre/include/lustre_dlm.h | 115 ++++- lustre/include/lustre_export.h | 18 + lustre/include/lustre_lib.h | 1 + lustre/include/lustre_net.h | 4 + lustre/ldlm/Makefile.am | 2 +- lustre/ldlm/ldlm_internal.h | 6 +- lustre/ldlm/ldlm_lib.c | 35 +- lustre/ldlm/ldlm_lock.c | 13 +- lustre/ldlm/ldlm_lockd.c | 35 +- lustre/ldlm/ldlm_pool.c | 948 +++++++++++++++++++++++++++++++++++++ lustre/ldlm/ldlm_request.c | 116 ++++- lustre/ldlm/ldlm_resource.c | 146 +++--- lustre/llite/llite_lib.c | 7 + lustre/mds/handler.c | 3 +- lustre/mgs/mgs_handler.c | 11 +- lustre/obdclass/lprocfs_status.c | 49 ++ lustre/obdecho/echo.c | 3 +- lustre/obdfilter/filter.c | 3 +- lustre/ptlrpc/Makefile.in | 1 + lustre/ptlrpc/autoMakefile.am | 3 +- lustre/ptlrpc/client.c | 32 +- lustre/ptlrpc/import.c | 4 +- lustre/ptlrpc/pack_generic.c | 101 +++- lustre/ptlrpc/pinger.c | 3 +- lustre/ptlrpc/ptlrpc_module.c | 4 + lustre/ptlrpc/wiretest.c | 26 +- lustre/tests/sanity.sh | 113 +++++ lustre/utils/lctl.c | 2 - lustre/utils/wirecheck.c | 6 +- lustre/utils/wiretest.c | 26 +- 34 files changed, 1698 insertions(+), 182 deletions(-) create mode 100644 lustre/ldlm/ldlm_pool.c diff --git a/lustre/ChangeLog b/lustre/ChangeLog index 73f0da6..6e143f2 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -156,6 +156,14 @@ Bugzilla : 13125 Description: osts not allocated evenly to files Details : change the condition to increase offset_idx +Severity : enhancement +Bugzilla : 2262 +Description: self-adjustable client's lru lists +Details : use adaptive algorithm for managing client cached locks lru + lists according to current server load, other client's work + pattern, memory activities, etc. Both, server and client + side namespaces provide number of proc tunables for controlling + things -------------------------------------------------------------------------------- 2007-08-27 Cluster File Systems, Inc. diff --git a/lustre/autoconf/lustre-core.m4 b/lustre/autoconf/lustre-core.m4 index ff515c5..e65ad06 100644 --- a/lustre/autoconf/lustre-core.m4 +++ b/lustre/autoconf/lustre-core.m4 @@ -1154,6 +1154,7 @@ LC_CONFIG_CHECKSUM LC_CONFIG_LIBLUSTRE_RECOVERY LC_CONFIG_QUOTA LC_CONFIG_HEALTH_CHECK_WRITE +LC_CONFIG_LRU_RESIZE LC_TASK_PPTR # RHEL4 patches @@ -1282,6 +1283,18 @@ LC_CONFIG_PINGER LC_CONFIG_LIBLUSTRE_RECOVERY ]) +AC_DEFUN([LC_CONFIG_LRU_RESIZE], +[AC_MSG_CHECKING([whether to enable lru self-adjusting]) +AC_ARG_ENABLE([lru_resize], + AC_HELP_STRING([--enable-lru-resize], + [enable lru resize support]), + [],[enable_lru_resize='yes']) +AC_MSG_RESULT([$enable_lru_resize]) +if test x$enable_lru_resize != xno; then + AC_DEFINE(HAVE_LRU_RESIZE_SUPPORT, 1, [Enable lru resize support]) +fi +]) + # # LC_CONFIG_QUOTA # diff --git a/lustre/include/lprocfs_status.h b/lustre/include/lprocfs_status.h index da13141..3f9ce4d 100644 --- a/lustre/include/lprocfs_status.h +++ b/lustre/include/lprocfs_status.h @@ -273,7 +273,13 @@ extern int lprocfs_obd_seq_create(struct obd_device *dev, char *name, extern int lprocfs_rd_u64(char *page, char **start, off_t off, int count, int *eof, void *data); extern int lprocfs_rd_atomic(char *page, char **start, off_t off, - int count, int *eof, void *data); + int count, int *eof, void *data); +extern int lprocfs_wr_atomic(struct file *file, const char *buffer, + unsigned long count, void *data); +extern int lprocfs_rd_uint(char *page, char **start, off_t off, + int count, int *eof, void *data); +extern int lprocfs_wr_uint(struct file *file, const char *buffer, + unsigned long count, void *data); extern int lprocfs_rd_uuid(char *page, char **start, off_t off, int count, int *eof, void *data); extern int lprocfs_rd_name(char *page, char **start, off_t off, diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index bb4a2af..e46042e 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -211,9 +211,8 @@ struct ptlrpc_body { __u32 pb_conn_cnt; __u32 pb_timeout; /* for req, the deadline, for rep, the service est */ __u32 pb_service_time; /* for rep, actual service time */ - __u32 pb_padding_1; - __u32 pb_padding_2; - __u32 pb_padding_3; + __u32 pb_limit; + __u64 pb_slv; }; extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb); @@ -287,6 +286,7 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb); #define OBD_CONNECT_CANCELSET 0x400000ULL /* Early batched cancels. */ #define OBD_CONNECT_SOM 0x00800000ULL /* Size on MDS */ #define OBD_CONNECT_AT 0x01000000ULL /* client uses adaptive timeouts */ +#define OBD_CONNECT_LRU_RESIZE 0x02000000ULL /* Lru resize feature. */ /* also update obd_connect_names[] for lprocfs_rd_connect_flags() * and lustre/utils/wirecheck.c */ @@ -294,12 +294,14 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb); OBD_CONNECT_ACL | OBD_CONNECT_XATTR | \ OBD_CONNECT_IBITS | OBD_CONNECT_JOIN | \ OBD_CONNECT_NODEVOH | OBD_CONNECT_ATTRFID | \ - OBD_CONNECT_CANCELSET | OBD_CONNECT_AT) + OBD_CONNECT_CANCELSET | OBD_CONNECT_AT | \ + OBD_CONNECT_LRU_RESIZE) #define OST_CONNECT_SUPPORTED (OBD_CONNECT_SRVLOCK | OBD_CONNECT_GRANT | \ OBD_CONNECT_REQPORTAL | OBD_CONNECT_VERSION | \ OBD_CONNECT_TRUNCLOCK | OBD_CONNECT_INDEX | \ OBD_CONNECT_BRW_SIZE | OBD_CONNECT_QUOTA64 | \ - OBD_CONNECT_CANCELSET | OBD_CONNECT_AT) + OBD_CONNECT_CANCELSET | OBD_CONNECT_AT | \ + OBD_CONNECT_LRU_RESIZE) #define ECHO_CONNECT_SUPPORTED (0) #define MGS_CONNECT_SUPPORTED (OBD_CONNECT_VERSION | OBD_CONNECT_AT) @@ -312,9 +314,6 @@ extern void lustre_swab_ptlrpc_body(struct ptlrpc_body *pb); #define OBD_OCD_VERSION_PATCH(version) ((int)((version)>>8)&255) #define OBD_OCD_VERSION_FIX(version) ((int)(version)&255) -#define exp_connect_cancelset(exp) \ - ((exp) ? (exp)->exp_connect_flags & OBD_CONNECT_CANCELSET : 0) - /* This structure is used for both request and reply. * * If we eventually have separate connect data for different types, which we diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index 163b9c58..5cceb70 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -42,8 +42,10 @@ typedef enum { ELDLM_BAD_NAMESPACE = 401 } ldlm_error_t; -#define LDLM_NAMESPACE_SERVER 0 -#define LDLM_NAMESPACE_CLIENT 1 +typedef enum { + LDLM_NAMESPACE_SERVER = 0, + LDLM_NAMESPACE_CLIENT = 1 +} ldlm_side_t; #define LDLM_FL_LOCK_CHANGED 0x000001 /* extent, mode, or resource changed */ @@ -207,10 +209,74 @@ static inline int lockmode_compat(ldlm_mode_t exist, ldlm_mode_t new) * */ +struct ldlm_pool; struct ldlm_lock; struct ldlm_resource; struct ldlm_namespace; +typedef int (*ldlm_pool_recalc_t)(struct ldlm_pool *pl); + +typedef int (*ldlm_pool_shrink_t)(struct ldlm_pool *pl, + int nr, unsigned int gfp_mask); + +enum { + LDLM_POOL_CTL_RECALC = 1 << 0, /* Pool recalc is enabled */ + LDLM_POOL_CTL_SHRINK = 1 << 1, /* Pool shrink is enabled */ + LDLM_POOL_CTL_FULL = (LDLM_POOL_CTL_RECALC | LDLM_POOL_CTL_SHRINK) +}; + +/* One second for pools thread check interval. */ +#define LDLM_POOLS_THREAD_PERIOD (1) + +/* 5% margin for modest pools. See ldlm_pool.c for details. */ +#define LDLM_POOLS_MODEST_MARGIN (5) + +/* A change to SLV in % after which we want to wake up pools thread asap. */ +#define LDLM_POOLS_FAST_SLV_CHANGE (5) + +struct ldlm_pool { + /* Common pool fields */ + cfs_proc_dir_entry_t *pl_proc_dir; /* Pool proc directory. */ + char pl_name[100]; /* Pool name, should be long + * enough to contain complex + * proc entry name. */ + spinlock_t pl_lock; /* Lock for protecting slv/clv + * updates. */ + atomic_t pl_limit; /* Number of allowed locks in + * in pool, both, client and + * server side. */ + atomic_t pl_granted; /* Number of granted locks. */ + atomic_t pl_grant_rate; /* Grant rate per T. */ + atomic_t pl_cancel_rate; /* Cancel rate per T. */ + atomic_t pl_grant_speed; /* Grant speed (GR - CR) per T. */ + __u64 pl_server_lock_volume; /* Server lock volume. Protected + * by pl_lock. */ + cfs_time_t pl_update_time; /* Time when last slv from server + * was obtained. */ + ldlm_pool_recalc_t pl_recalc; /* Recalc callback func pointer. */ + ldlm_pool_shrink_t pl_shrink; /* Shrink callback func pointer. */ + int pl_control; /* Pool features mask */ + + /* Server side pool fields */ + atomic_t pl_grant_plan; /* Planned number of granted + * locks for next T. */ + atomic_t pl_grant_step; /* Grant plan step for next T. */ + + /* Client side pool related fields */ + atomic_t pl_lock_volume_factor; /* Lock volume factor. */ + struct lprocfs_stats *pl_stats; /* Pool statistics. */ +}; + +static inline int pool_recalc_enabled(struct ldlm_pool *pl) +{ + return pl->pl_control & LDLM_POOL_CTL_RECALC; +} + +static inline int pool_shrink_enabled(struct ldlm_pool *pl) +{ + return pl->pl_control & LDLM_POOL_CTL_SHRINK; +} + typedef int (*ldlm_res_policy)(struct ldlm_namespace *, struct ldlm_lock **, void *req_cookie, ldlm_mode_t mode, int flags, void *data); @@ -221,6 +287,11 @@ struct ldlm_valblock_ops { int buf_idx, int increase); }; +typedef enum { + LDLM_NAMESPACE_GREEDY = 1 << 0, + LDLM_NAMESPACE_MODEST = 1 << 1 +} ldlm_appetite_t; + /* default values for the "max_nolock_size", "contention_time" * and "contended_locks" namespace tunables */ #define NS_DEFAULT_MAX_NOLOCK_BYTES 131072 @@ -229,7 +300,9 @@ struct ldlm_valblock_ops { struct ldlm_namespace { char *ns_name; - __u32 ns_client; /* is this a client-side lock tree? */ + ldlm_side_t ns_client; /* is this a client-side lock tree? */ + __u64 ns_connect_flags; /* client side connect flags + * supported by server */ struct list_head *ns_hash; /* hash table for ns */ spinlock_t ns_hash_lock; __u32 ns_refcount; /* count of resources in the hash */ @@ -250,6 +323,9 @@ struct ldlm_namespace { struct ldlm_valblock_ops *ns_lvbo; void *ns_lvbp; cfs_waitq_t ns_waitq; + struct ldlm_pool ns_pool; + ldlm_appetite_t ns_appetite; + /* if more than @ns_contented_locks found, the resource considered * as contended */ unsigned ns_contended_locks; @@ -260,6 +336,12 @@ struct ldlm_namespace { unsigned ns_max_nolock_size; }; +static inline int ns_connect_lru_resize(struct ldlm_namespace *ns) +{ + LASSERT(ns != NULL); + return ns->ns_connect_flags & OBD_CONNECT_LRU_RESIZE; +} + /* * * Resource hash table @@ -490,8 +572,8 @@ int ldlm_request_cancel(struct ptlrpc_request *req, struct ldlm_request *dlm_req, int first); int ldlm_del_waiting_lock(struct ldlm_lock *lock); int ldlm_refresh_waiting_lock(struct ldlm_lock *lock); -int ldlm_get_ref(void); -void ldlm_put_ref(int force); +int ldlm_get_ref(ldlm_side_t client); +void ldlm_put_ref(ldlm_side_t client, int force); /* ldlm_lock.c */ ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res); @@ -568,7 +650,8 @@ void ldlm_lock_dump_handle(int level, struct lustre_handle *); void ldlm_unlink_lock_skiplist(struct ldlm_lock *req); /* resource.c */ -struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 local); +struct ldlm_namespace *ldlm_namespace_new(char *name, ldlm_side_t client, + ldlm_appetite_t apt); int ldlm_namespace_cleanup(struct ldlm_namespace *ns, int flags); int ldlm_namespace_free(struct ldlm_namespace *ns, int force); int ldlm_proc_setup(void); @@ -625,6 +708,7 @@ int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, struct ldlm_res_id res_id, int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *new, void *data, __u32 data_len); int ldlm_cli_convert(struct lustre_handle *, int new_mode, int *flags); +int ldlm_cli_update_pool(struct ptlrpc_request *req); int ldlm_cli_cancel(struct lustre_handle *lockh); int ldlm_cli_cancel_unused(struct ldlm_namespace *, struct ldlm_res_id *, int flags, void *opaque); @@ -672,4 +756,23 @@ static inline void check_res_locked(struct ldlm_resource *res) struct ldlm_resource * lock_res_and_lock(struct ldlm_lock *lock); void unlock_res_and_lock(struct ldlm_lock *lock); +/* ldlm_pool.c */ +int ldlm_pools_init(ldlm_side_t client); +void ldlm_pools_fini(void); +void ldlm_pools_wakeup(void); +int ldlm_pools_shrink(int nr, unsigned int gfp_mask); + +int ldlm_pool_init(struct ldlm_pool *pl, struct ldlm_namespace *ns, + int idx, ldlm_side_t client); +int ldlm_pool_shrink(struct ldlm_pool *pl, int nr, + unsigned int gfp_mask); +void ldlm_pool_fini(struct ldlm_pool *pl); +int ldlm_pool_setup(struct ldlm_pool *pl, __u32 limit); +int ldlm_pool_recalc(struct ldlm_pool *pl); +__u64 ldlm_pool_get_slv(struct ldlm_pool *pl); +__u32 ldlm_pool_get_limit(struct ldlm_pool *pl); +void ldlm_pool_set_slv(struct ldlm_pool *pl, __u64 slv); +void ldlm_pool_set_limit(struct ldlm_pool *pl, __u32 limit); +void ldlm_pool_add(struct ldlm_pool *pl, struct ldlm_lock *lock); +void ldlm_pool_del(struct ldlm_pool *pl, struct ldlm_lock *lock); #endif diff --git a/lustre/include/lustre_export.h b/lustre/include/lustre_export.h index 39df979..679b451 100644 --- a/lustre/include/lustre_export.h +++ b/lustre/include/lustre_export.h @@ -99,6 +99,24 @@ struct obd_export { #define exp_filter_data u.eu_filter_data #define exp_ec_data u.eu_ec_data +static inline int exp_connect_cancelset(struct obd_export *exp) +{ + return exp ? exp->exp_connect_flags & OBD_CONNECT_CANCELSET : 0; +} + +static inline int exp_connect_lru_resize(struct obd_export *exp) +{ + LASSERT(exp != NULL); + return exp->exp_connect_flags & OBD_CONNECT_LRU_RESIZE; +} + +static inline int imp_connect_lru_resize(struct obd_import *imp) +{ + LASSERT(imp != NULL); + return imp->imp_connect_data.ocd_connect_flags & + OBD_CONNECT_LRU_RESIZE; +} + extern struct obd_export *class_conn2export(struct lustre_handle *conn); extern struct obd_device *class_conn2obd(struct lustre_handle *conn); diff --git a/lustre/include/lustre_lib.h b/lustre/include/lustre_lib.h index 1e1a116..d1be77c 100644 --- a/lustre/include/lustre_lib.h +++ b/lustre/include/lustre_lib.h @@ -61,6 +61,7 @@ void target_destroy_export(struct obd_export *exp); int target_handle_reconnect(struct lustre_handle *conn, struct obd_export *exp, struct obd_uuid *cluuid); int target_handle_ping(struct ptlrpc_request *req); +int target_pack_pool_reply(struct ptlrpc_request *req); void target_committed_to_req(struct ptlrpc_request *req); #ifdef HAVE_QUOTA_SUPPORT diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 9153574..3111f41 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -826,6 +826,10 @@ __u32 lustre_msg_get_opc(struct lustre_msg *msg); __u64 lustre_msg_get_last_xid(struct lustre_msg *msg); __u64 lustre_msg_get_last_committed(struct lustre_msg *msg); __u64 lustre_msg_get_transno(struct lustre_msg *msg); +__u64 lustre_msg_get_slv(struct lustre_msg *msg); +__u32 lustre_msg_get_limit(struct lustre_msg *msg); +void lustre_msg_set_slv(struct lustre_msg *msg, __u64 slv); +void lustre_msg_set_limit(struct lustre_msg *msg, __u64 limit); int lustre_msg_get_status(struct lustre_msg *msg); __u32 lustre_msg_get_conn_cnt(struct lustre_msg *msg); int lustre_msg_is_v1(struct lustre_msg *msg); diff --git a/lustre/ldlm/Makefile.am b/lustre/ldlm/Makefile.am index aeb4a06..7beda3d 100644 --- a/lustre/ldlm/Makefile.am +++ b/lustre/ldlm/Makefile.am @@ -10,4 +10,4 @@ MOSTLYCLEANFILES := @MOSTLYCLEANFILES@ DIST_SOURCES = ldlm_extent.c ldlm_flock.c ldlm_internal.h ldlm_lib.c \ ldlm_lock.c ldlm_lockd.c ldlm_plain.c ldlm_request.c \ - ldlm_resource.c l_lock.c ldlm_inodebits.c + ldlm_resource.c l_lock.c ldlm_inodebits.c ldlm_pool.c diff --git a/lustre/ldlm/ldlm_internal.h b/lustre/ldlm/ldlm_internal.h index 165fd6f..936170f 100644 --- a/lustre/ldlm/ldlm_internal.h +++ b/lustre/ldlm/ldlm_internal.h @@ -2,6 +2,8 @@ * vim:expandtab:shiftwidth=8:tabstop=8: */ +#define MAX_STRING_SIZE 128 + /* ldlm_request.c */ typedef enum { LDLM_ASYNC, @@ -11,7 +13,7 @@ typedef enum { /* Cancel lru flag, it indicates we cancel aged locks. */ #define LDLM_CANCEL_AGED 0x00000001 -int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync); +int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync); int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, int count, int max, int flags); @@ -83,6 +85,7 @@ void l_check_ns_lock(struct ldlm_namespace *ns); void l_check_no_ns_lock(struct ldlm_namespace *ns); extern cfs_proc_dir_entry_t *ldlm_svc_proc_dir; +extern cfs_proc_dir_entry_t *ldlm_type_proc_dir; struct ldlm_state { struct ptlrpc_service *ldlm_cb_service; @@ -94,4 +97,3 @@ struct ldlm_state { int ldlm_init(void); void ldlm_exit(void); - diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 48229aa..838e6b7 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -285,7 +285,7 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf) } else { cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT; } - rc = ldlm_get_ref(); + rc = ldlm_get_ref(LDLM_NAMESPACE_CLIENT); if (rc) { CERROR("ldlm_get_ref failed: %d\n", rc); GOTO(err, rc); @@ -336,7 +336,7 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf) err_import: class_destroy_import(imp); err_ldlm: - ldlm_put_ref(0); + ldlm_put_ref(LDLM_NAMESPACE_CLIENT, 0); err: RETURN(rc); @@ -345,8 +345,7 @@ err: int client_obd_cleanup(struct obd_device *obddev) { ENTRY; - ldlm_put_ref(obddev->obd_force); - + ldlm_put_ref(LDLM_NAMESPACE_CLIENT, obddev->obd_force); RETURN(0); } @@ -376,7 +375,8 @@ int client_connect_import(struct lustre_handle *dlm_handle, if (obd->obd_namespace != NULL) CERROR("already have namespace!\n"); obd->obd_namespace = ldlm_namespace_new(obd->obd_name, - LDLM_NAMESPACE_CLIENT); + LDLM_NAMESPACE_CLIENT, + LDLM_NAMESPACE_GREEDY); if (obd->obd_namespace == NULL) GOTO(out_disco, rc = -ENOMEM); @@ -1405,6 +1405,30 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc) return 1; } +static inline struct ldlm_pool *ldlm_exp2pl(struct obd_export *exp) +{ + LASSERT(exp != NULL); + return &exp->exp_obd->obd_namespace->ns_pool; +} + +int target_pack_pool_reply(struct ptlrpc_request *req) +{ + struct ldlm_pool *pl; + ENTRY; + + if (!exp_connect_lru_resize(req->rq_export)) + RETURN(0); + + pl = ldlm_exp2pl(req->rq_export); + + spin_lock(&pl->pl_lock); + lustre_msg_set_slv(req->rq_repmsg, ldlm_pool_get_slv(pl)); + lustre_msg_set_limit(req->rq_repmsg, ldlm_pool_get_limit(pl)); + spin_unlock(&pl->pl_lock); + + RETURN(0); +} + int target_send_reply_msg (struct ptlrpc_request *req, int rc, int fail_id) { @@ -1422,6 +1446,7 @@ target_send_reply_msg (struct ptlrpc_request *req, int rc, int fail_id) DEBUG_REQ(D_NET, req, "sending reply"); } + target_pack_pool_reply(req); return (ptlrpc_send_reply(req, PTLRPC_REPLY_MAYBE_DIFFICULT)); } diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index fceabe1..cc3bbb5 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -562,6 +562,7 @@ void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock, __u32 mode) LDLM_LOCK_PUT(lock); /* matches the ldlm_lock_get in addref */ } + void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) { struct ldlm_namespace *ns; @@ -595,7 +596,7 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) ldlm_lock_remove_from_lru(lock); unlock_res_and_lock(lock); if ((lock->l_flags & LDLM_FL_ATOMIC_CB) || - ldlm_bl_to_thread(ns, NULL, lock, 0) != 0) + ldlm_bl_to_thread(ns, NULL, lock, 0) != 0) ldlm_handle_bl_callback(ns, NULL, lock); } else if (ns->ns_client == LDLM_NAMESPACE_CLIENT && !lock->l_readers && !lock->l_writers && @@ -613,7 +614,7 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode) /* Call ldlm_cancel_lru() only if EARLY_CANCEL is not supported * by the server, otherwise, it is done on enqueue. */ if (!exp_connect_cancelset(lock->l_conn_export)) - ldlm_cancel_lru(ns, LDLM_ASYNC); + ldlm_cancel_lru(ns, 0, LDLM_ASYNC); } else { unlock_res_and_lock(lock); } @@ -864,6 +865,7 @@ void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list) if (work_list && lock->l_completion_ast != NULL) ldlm_add_ast_work_item(lock, NULL, work_list); + ldlm_pool_add(&res->lr_namespace->ns_pool, lock); EXIT; } @@ -1522,6 +1524,13 @@ void ldlm_lock_cancel(struct ldlm_lock *lock) ldlm_del_waiting_lock(lock); ldlm_resource_unlink_lock(lock); ldlm_lock_destroy_nolock(lock); + + if (lock->l_granted_mode == lock->l_req_mode) + ldlm_pool_del(&ns->ns_pool, lock); + + /* Make sure we will not be called again for same lock what is possible + * if not to zero out lock->l_granted_mode */ + lock->l_granted_mode = 0; unlock_res_and_lock(lock); EXIT; diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 4145ff4..e79914d 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -44,13 +44,11 @@ extern cfs_mem_cache_t *ldlm_resource_slab; extern cfs_mem_cache_t *ldlm_lock_slab; extern struct lustre_lock ldlm_handle_lock; extern struct list_head ldlm_namespace_list; - extern struct semaphore ldlm_namespace_lock; + static struct semaphore ldlm_ref_sem; static int ldlm_refcount; -/* LDLM state */ - static struct ldlm_state *ldlm_state; inline cfs_time_t round_timeout(cfs_time_t timeout) @@ -1135,8 +1133,6 @@ int ldlm_request_cancel(struct ptlrpc_request *req, int i, count, done = 0; ENTRY; - LDLM_DEBUG_NOLOCK("server-side cancel handler START: %d locks, " - "starting at %d", dlm_req->lock_count, first); count = dlm_req->lock_count ? dlm_req->lock_count : 1; if (first >= count) RETURN(0); @@ -1146,6 +1142,8 @@ int ldlm_request_cancel(struct ptlrpc_request *req, if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) RETURN(0); + LDLM_DEBUG_NOLOCK("server-side cancel handler START: %d locks", + count - first); for (i = first; i < count; i++) { lock = ldlm_handle2lock(&dlm_req->lock_handle[i]); if (!lock) { @@ -1676,16 +1674,16 @@ static int ldlm_bl_thread_main(void *arg) #endif -static int ldlm_setup(void); -static int ldlm_cleanup(int force); +static int ldlm_setup(ldlm_side_t client); +static int ldlm_cleanup(ldlm_side_t client, int force); -int ldlm_get_ref(void) +int ldlm_get_ref(ldlm_side_t client) { int rc = 0; ENTRY; mutex_down(&ldlm_ref_sem); if (++ldlm_refcount == 1) { - rc = ldlm_setup(); + rc = ldlm_setup(client); if (rc) ldlm_refcount--; } @@ -1694,12 +1692,12 @@ int ldlm_get_ref(void) RETURN(rc); } -void ldlm_put_ref(int force) +void ldlm_put_ref(ldlm_side_t client, int force) { ENTRY; mutex_down(&ldlm_ref_sem); if (ldlm_refcount == 1) { - int rc = ldlm_cleanup(force); + int rc = ldlm_cleanup(client, force); if (rc) CERROR("ldlm_cleanup failed: %d\n", rc); else @@ -1712,7 +1710,7 @@ void ldlm_put_ref(int force) EXIT; } -static int ldlm_setup(void) +static int ldlm_setup(ldlm_side_t client) { struct ldlm_bl_pool *blp; int rc = 0; @@ -1814,6 +1812,12 @@ static int ldlm_setup(void) expired_lock_thread.elt_state == ELT_READY); #endif +#ifdef __KERNEL__ + rc = ldlm_pools_init(client); + if (rc) + GOTO(out_thread, rc); +#endif + RETURN(0); #ifdef __KERNEL__ @@ -1832,7 +1836,7 @@ static int ldlm_setup(void) return rc; } -static int ldlm_cleanup(int force) +static int ldlm_cleanup(ldlm_side_t client, int force) { #ifdef __KERNEL__ struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool; @@ -1846,6 +1850,10 @@ static int ldlm_cleanup(int force) } #ifdef __KERNEL__ + ldlm_pools_fini(); +#endif + +#ifdef __KERNEL__ while (atomic_read(&blp->blp_num_threads) > 0) { struct ldlm_bl_work_item blwi = { .blwi_ns = NULL }; @@ -1998,6 +2006,7 @@ EXPORT_SYMBOL(target_cancel_recovery_timer); EXPORT_SYMBOL(target_send_reply); EXPORT_SYMBOL(target_queue_recovery_request); EXPORT_SYMBOL(target_handle_ping); +EXPORT_SYMBOL(target_pack_pool_reply); EXPORT_SYMBOL(target_handle_disconnect); EXPORT_SYMBOL(target_queue_final_reply); diff --git a/lustre/ldlm/ldlm_pool.c b/lustre/ldlm/ldlm_pool.c new file mode 100644 index 0000000..e609571 --- /dev/null +++ b/lustre/ldlm/ldlm_pool.c @@ -0,0 +1,948 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (c) 2007 Cluster File Systems, Inc. + * Author: Yury Umanets + * + * This file is part of the Lustre file system, http://www.lustre.org + * Lustre is a trademark of Cluster File Systems, Inc. + * + * You may have signed or agreed to another license before downloading + * this software. If so, you are bound by the terms and conditions + * of that agreement, and the following does not apply to you. See the + * LICENSE file included with this distribution for more information. + * + * If you did not agree to a different license, then this copy of Lustre + * is open source software; you can redistribute it and/or modify it + * under the terms of version 2 of the GNU General Public License as + * published by the Free Software Foundation. + * + * In either case, Lustre is distributed in the hope that it will be + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * license text for more details. + */ + +/* Idea of this code is rather simple. Each second, for each server namespace + * we have SLV - server lock volume which is calculated on current number of + * granted locks, grant speed for past period, etc - that is, locking load. + * This SLV number may be thought as a flow definition for simplicity. It is + * sent to clients with each occasion to let them know what is current load + * situation on the server. By default, at the beginning, SLV on server is + * set max value which is calculated as the following: allow to one client + * have all locks of limit ->pl_limit for 10h. + * + * Next, on clients, number of cached locks is not limited artificially in any + * way as it was before. Instead, client calculates CLV, that is, client lock + * volume for each lock and compares it with last SLV from the server. CLV is + * calculated as the number of locks in LRU * lock live time in seconds. If + * CLV > SLV - lock is canceled. + * + * Client has LVF, that is, lock volume factor which regulates how much sensitive + * client should be about last SLV from server. The higher LVF is the more locks + * will be canceled on client. Default value for it is 1. Setting LVF to 2 means + * that client will cancel locks 2 times faster. + * + * Locks on a client will be canceled more intensively in these cases: + * (1) if SLV is smaller, that is, load is higher on the server; + * (2) client has a lot of locks (the more locks are held by client, the bigger + * chances that some of them should be canceled); + * (3) client has old locks (taken some time ago); + * + * Thus, according to flow paradigm that we use for better understanding SLV, + * CLV is the volume of particle in flow described by SLV. According to this, + * if flow is getting thinner, more and more particles become outside of it and + * as particles are locks, they should be canceled. + * + * General idea of this belongs to Vitaly Fertman (vitaly@clusterfs.com). Andreas + * Dilger (adilger@clusterfs.com) proposed few nice ideas like using LVF and many + * cleanups. Flow definition to allow more easy understanding of the logic belongs + * to Nikita Danilov (nikita@clusterfs.com) as well as many cleanups and fixes. + * And design and implementation are done by Yury Umanets (umka@clusterfs.com). + * + * Glossary for terms used: + * + * pl_limit - Number of allowed locks in pool. Applies to server and client + * side (tunable); + * + * pl_granted - Number of granted locks (calculated); + * pl_grant_rate - Number of granted locks for last T (calculated); + * pl_cancel_rate - Number of canceled locks for last T (calculated); + * pl_grant_speed - Grant speed (GR - CR) for last T (calculated); + * pl_grant_plan - Planned number of granted locks for next T (calculated); + * + * pl_grant_step - Grant plan step, that is how ->pl_grant_plan + * will change in next T (tunable); + * + * pl_server_lock_volume - Current server lock volume (calculated); + * + * As it may be seen from list above, we have few possible tunables which may + * affect behavior much. They all may be modified via proc. However, they also + * give a possibility for constructing few pre-defined behavior policies. If + * none of predefines is suitable for a working pattern being used, new one may + * be "constructed" via proc tunables. + */ + +#define DEBUG_SUBSYSTEM S_LDLM + +#ifdef __KERNEL__ +# include +#else +# include +# include +#endif + +#include +#include +#include "ldlm_internal.h" + +#ifdef HAVE_LRU_RESIZE_SUPPORT + +/* 50 ldlm locks for 1MB of RAM. */ +#define LDLM_POOL_HOST_L ((num_physpages >> (20 - PAGE_SHIFT)) * 50) + +/* Default step in % for grant plan. */ +#define LDLM_POOL_GSP (5) + +/* LDLM_POOL_GSP% of all locks is default GP. */ +#define LDLM_POOL_GP(L) ((L) * LDLM_POOL_GSP / 100) + +/* Max age for locks on clients. */ +#define LDLM_POOL_MAX_AGE (36000) + +#ifdef __KERNEL__ +extern cfs_proc_dir_entry_t *ldlm_ns_proc_dir; +#endif + +extern atomic_t ldlm_srv_namespace_nr; +extern atomic_t ldlm_cli_namespace_nr; +extern struct list_head ldlm_namespace_list; +extern struct semaphore ldlm_namespace_lock; + +#define avg(src, add) \ + ((src) = ((src) + (add)) / 2) + +static inline __u64 dru(__u64 val, __u32 div) +{ + __u64 ret = val + (div - 1); + do_div(ret, div); + return ret; +} + +static inline __u64 ldlm_pool_slv_max(__u32 L) +{ + /* Allow to have all locks for 1 client for 10 hrs. + * Formula is the following: limit * 10h / 1 client. */ + __u64 lim = L * LDLM_POOL_MAX_AGE / 1; + return lim; +} + +static inline __u64 ldlm_pool_slv_min(__u32 L) +{ + return 1; +} + +enum { + LDLM_POOL_GRANTED_STAT = 0, + LDLM_POOL_GRANT_RATE_STAT, + LDLM_POOL_CANCEL_RATE_STAT, + LDLM_POOL_GRANT_PLAN_STAT, + LDLM_POOL_SLV_STAT, + LDLM_POOL_LAST_STAT +}; + +static inline struct ldlm_namespace *ldlm_pl2ns(struct ldlm_pool *pl) +{ + return container_of(pl, struct ldlm_namespace, ns_pool); +} + +static int ldlm_srv_pool_recalc(struct ldlm_pool *pl) +{ + int slv_factor, limit, granted, grant_speed; + int grant_rate, cancel_rate, grant_step; + time_t recalc_interval_sec; + __u32 grant_plan; + __u64 slv; + ENTRY; + + spin_lock(&pl->pl_lock); + + /* Get all values to local variables to avoid change some of them in + * the middle of re-calc. */ + slv = ldlm_pool_get_slv(pl); + limit = ldlm_pool_get_limit(pl); + granted = atomic_read(&pl->pl_granted); + grant_rate = atomic_read(&pl->pl_grant_rate); + grant_plan = atomic_read(&pl->pl_grant_plan); + grant_step = atomic_read(&pl->pl_grant_step); + grant_speed = atomic_read(&pl->pl_grant_speed); + cancel_rate = atomic_read(&pl->pl_cancel_rate); + + /* Zero out grant/cancel rates and speed for this T. */ + atomic_set(&pl->pl_grant_rate, 0); + atomic_set(&pl->pl_cancel_rate, 0); + atomic_set(&pl->pl_grant_speed, 0); + + /* Make sure that we use correct data for statistics. Pools thread may + * be not scheduled long time due to big CPU contention. We need to + * catch this. */ + recalc_interval_sec = cfs_duration_sec(cfs_time_current() - + pl->pl_update_time); + if (recalc_interval_sec == 0) + recalc_interval_sec = 1; + + lprocfs_counter_add(pl->pl_stats, LDLM_POOL_SLV_STAT, slv); + lprocfs_counter_add(pl->pl_stats, LDLM_POOL_GRANTED_STAT, + granted); + lprocfs_counter_add(pl->pl_stats, LDLM_POOL_GRANT_RATE_STAT, + grant_rate / recalc_interval_sec); + lprocfs_counter_add(pl->pl_stats, LDLM_POOL_GRANT_PLAN_STAT, + grant_plan / recalc_interval_sec); + lprocfs_counter_add(pl->pl_stats, LDLM_POOL_CANCEL_RATE_STAT, + cancel_rate / recalc_interval_sec); + + /* Correcting old @grant_plan which may be obsolete in the case of big + * load on the server, when pools thread is not scheduled every 1s sharp + * (curent period). All values used in calculation are updated from + * other threads and up-to-date. Only @grant_plan is calculated by pool + * thread and directly affects SLV. */ + grant_plan += grant_speed - (grant_speed / recalc_interval_sec); + + if ((slv_factor = limit - (granted - grant_plan)) <= 0) + slv_factor = 1; + + grant_plan = granted + ((limit - granted) * grant_step) / 100; + slv = (slv * ((slv_factor * 100) / limit)); + slv = dru(slv, 100); + + if (slv > ldlm_pool_slv_max(limit)) { + CDEBUG(D_DLMTRACE, "Correcting SLV to allowed max "LPU64"\n", + ldlm_pool_slv_max(limit)); + slv = ldlm_pool_slv_max(limit); + } else if (slv < ldlm_pool_slv_min(limit)) { + CDEBUG(D_DLMTRACE, "Correcting SLV to allowed min "LPU64"\n", + ldlm_pool_slv_min(limit)); + slv = ldlm_pool_slv_min(limit); + } + + ldlm_pool_set_slv(pl, slv); + atomic_set(&pl->pl_grant_plan, grant_plan); + pl->pl_update_time = cfs_time_current(); + spin_unlock(&pl->pl_lock); + + RETURN(0); +} + +/* Our goal here is to decrease SLV the way to make a client hold + * @nr locks smaller in next 10h. */ +static int ldlm_srv_pool_shrink(struct ldlm_pool *pl, + int nr, unsigned int gfp_mask) +{ + __u32 granted, limit; + __u64 slv_delta; + ENTRY; + + /* Client already canceled locks but server is already in shrinker and + * can't cancel anything. Let's catch this race. */ + if ((granted = atomic_read(&pl->pl_granted)) == 0) + RETURN(0); + + spin_lock(&pl->pl_lock); + + /* Simple proportion but it gives impression on how much should be + * SLV changed for request @nr of locks to be canceled.*/ + slv_delta = nr * ldlm_pool_get_slv(pl); + limit = ldlm_pool_get_limit(pl); + do_div(slv_delta, granted); + + /* As SLV has some dependence on historical data, that is new value + * is based on old one, this decreasing will make clients get some + * locks back to the server and after some time it will stabilize.*/ + if (slv_delta < ldlm_pool_get_slv(pl)) + ldlm_pool_set_slv(pl, ldlm_pool_get_slv(pl) - slv_delta); + else + ldlm_pool_set_slv(pl, ldlm_pool_slv_min(limit)); + spin_unlock(&pl->pl_lock); + + /* We did not really free any memory here so far, it only will be + * freed later may be, so that we return 0 to not confuse VM. */ + RETURN(0); +} + +static int ldlm_cli_pool_recalc(struct ldlm_pool *pl) +{ + int grant_rate, cancel_rate; + time_t recalc_interval_sec; + ENTRY; + + spin_lock(&pl->pl_lock); + grant_rate = atomic_read(&pl->pl_grant_rate); + cancel_rate = atomic_read(&pl->pl_cancel_rate); + + recalc_interval_sec = cfs_duration_sec(cfs_time_current() - + pl->pl_update_time); + if (recalc_interval_sec == 0) + recalc_interval_sec = 1; + + lprocfs_counter_add(pl->pl_stats, LDLM_POOL_SLV_STAT, + ldlm_pool_get_slv(pl)); + lprocfs_counter_add(pl->pl_stats, LDLM_POOL_GRANTED_STAT, + atomic_read(&pl->pl_granted)); + lprocfs_counter_add(pl->pl_stats, LDLM_POOL_GRANT_RATE_STAT, + grant_rate / recalc_interval_sec); + lprocfs_counter_add(pl->pl_stats, LDLM_POOL_CANCEL_RATE_STAT, + cancel_rate / recalc_interval_sec); + + spin_unlock(&pl->pl_lock); + + ldlm_cancel_lru(ldlm_pl2ns(pl), 0, LDLM_ASYNC); + RETURN(0); +} + +static int ldlm_cli_pool_shrink(struct ldlm_pool *pl, + int nr, unsigned int gfp_mask) +{ + ENTRY; + RETURN(ldlm_cancel_lru(ldlm_pl2ns(pl), nr, LDLM_SYNC)); +} + +int ldlm_pool_recalc(struct ldlm_pool *pl) +{ + if (pl->pl_recalc != NULL && pool_recalc_enabled(pl)) + return pl->pl_recalc(pl); + return 0; +} +EXPORT_SYMBOL(ldlm_pool_recalc); + +int ldlm_pool_shrink(struct ldlm_pool *pl, int nr, + unsigned int gfp_mask) +{ + if (pl->pl_shrink != NULL && pool_shrink_enabled(pl)) { + CDEBUG(D_DLMTRACE, "%s: request to shrink %d locks\n", + pl->pl_name, nr); + return pl->pl_shrink(pl, nr, gfp_mask); + } + return 0; +} +EXPORT_SYMBOL(ldlm_pool_shrink); + +/* The purpose of this function is to re-setup limit and maximal allowed + * slv according to the passed limit. */ +int ldlm_pool_setup(struct ldlm_pool *pl, __u32 limit) +{ + ENTRY; + if (ldlm_pl2ns(pl)->ns_client == LDLM_NAMESPACE_SERVER) { + spin_lock(&pl->pl_lock); + ldlm_pool_set_limit(pl, limit); + spin_unlock(&pl->pl_lock); + } + RETURN(0); +} +EXPORT_SYMBOL(ldlm_pool_setup); + +#ifdef __KERNEL__ +static int lprocfs_rd_pool_state(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + int nr = 0, granted, grant_rate, cancel_rate; + int grant_speed, grant_plan, grant_step; + struct ldlm_pool *pl = data; + __u32 limit; + __u64 slv; + + spin_lock(&pl->pl_lock); + slv = pl->pl_server_lock_volume; + limit = ldlm_pool_get_limit(pl); + granted = atomic_read(&pl->pl_granted); + grant_rate = atomic_read(&pl->pl_grant_rate); + cancel_rate = atomic_read(&pl->pl_cancel_rate); + grant_speed = atomic_read(&pl->pl_grant_speed); + grant_plan = atomic_read(&pl->pl_grant_plan); + grant_step = atomic_read(&pl->pl_grant_step); + spin_unlock(&pl->pl_lock); + + nr += snprintf(page + nr, count - nr, "LDLM pool state (%s):\n", + pl->pl_name); + nr += snprintf(page + nr, count - nr, " SLV: "LPU64"\n", slv); + if (ldlm_pl2ns(pl)->ns_client == LDLM_NAMESPACE_SERVER) { + nr += snprintf(page + nr, count - nr, " GSP: %d%%\n", + grant_step); + nr += snprintf(page + nr, count - nr, " GP: %d\n", + grant_plan); + } else { + nr += snprintf(page + nr, count - nr, " LVF: %d\n", + atomic_read(&pl->pl_lock_volume_factor)); + } + nr += snprintf(page + nr, count - nr, " GR: %d\n", grant_rate); + nr += snprintf(page + nr, count - nr, " CR: %d\n", cancel_rate); + nr += snprintf(page + nr, count - nr, " GS: %d\n", grant_speed); + nr += snprintf(page + nr, count - nr, " G: %d\n", granted); + nr += snprintf(page + nr, count - nr, " L: %d\n", limit); + return nr; +} + +static int ldlm_pool_proc_init(struct ldlm_pool *pl) +{ + struct ldlm_namespace *ns = ldlm_pl2ns(pl); + struct proc_dir_entry *parent_ns_proc; + struct lprocfs_vars pool_vars[2]; + char *var_name = NULL; + int rc = 0; + ENTRY; + + OBD_ALLOC(var_name, MAX_STRING_SIZE + 1); + if (!var_name) + RETURN(-ENOMEM); + + parent_ns_proc = lprocfs_srch(ldlm_ns_proc_dir, ns->ns_name); + if (parent_ns_proc == NULL) { + CERROR("%s: proc entry is not initialized\n", + ns->ns_name); + GOTO(out_free_name, rc = -EINVAL); + } + pl->pl_proc_dir = lprocfs_register("pool", parent_ns_proc, + NULL, NULL); + if (IS_ERR(pl->pl_proc_dir)) { + CERROR("LProcFS failed in ldlm-pool-init\n"); + rc = PTR_ERR(pl->pl_proc_dir); + GOTO(out_free_name, rc); + } + + var_name[MAX_STRING_SIZE] = '\0'; + memset(pool_vars, 0, sizeof(pool_vars)); + pool_vars[0].name = var_name; + + snprintf(var_name, MAX_STRING_SIZE, "server_lock_volume"); + pool_vars[0].data = &pl->pl_server_lock_volume; + pool_vars[0].read_fptr = lprocfs_rd_u64; + lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0); + + snprintf(var_name, MAX_STRING_SIZE, "limit"); + pool_vars[0].data = &pl->pl_limit; + pool_vars[0].read_fptr = lprocfs_rd_atomic; + pool_vars[0].write_fptr = lprocfs_wr_atomic; + lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0); + + snprintf(var_name, MAX_STRING_SIZE, "granted"); + pool_vars[0].data = &pl->pl_granted; + pool_vars[0].read_fptr = lprocfs_rd_atomic; + lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0); + + snprintf(var_name, MAX_STRING_SIZE, "control"); + pool_vars[0].data = &pl->pl_control; + pool_vars[0].read_fptr = lprocfs_rd_uint; + pool_vars[0].write_fptr = lprocfs_wr_uint; + lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0); + + snprintf(var_name, MAX_STRING_SIZE, "grant_speed"); + pool_vars[0].data = &pl->pl_grant_speed; + pool_vars[0].read_fptr = lprocfs_rd_atomic; + lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0); + + snprintf(var_name, MAX_STRING_SIZE, "cancel_rate"); + pool_vars[0].data = &pl->pl_cancel_rate; + pool_vars[0].read_fptr = lprocfs_rd_atomic; + lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0); + + snprintf(var_name, MAX_STRING_SIZE, "grant_rate"); + pool_vars[0].data = &pl->pl_grant_rate; + pool_vars[0].read_fptr = lprocfs_rd_atomic; + lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0); + + if (ns->ns_client == LDLM_NAMESPACE_SERVER) { + snprintf(var_name, MAX_STRING_SIZE, "grant_plan"); + pool_vars[0].data = &pl->pl_grant_plan; + pool_vars[0].read_fptr = lprocfs_rd_atomic; + lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0); + + snprintf(var_name, MAX_STRING_SIZE, "grant_step"); + pool_vars[0].data = &pl->pl_grant_step; + pool_vars[0].read_fptr = lprocfs_rd_atomic; + pool_vars[0].write_fptr = lprocfs_wr_atomic; + lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0); + } else { + snprintf(var_name, MAX_STRING_SIZE, "lock_volume_factor"); + pool_vars[0].data = &pl->pl_lock_volume_factor; + pool_vars[0].read_fptr = lprocfs_rd_uint; + pool_vars[0].write_fptr = lprocfs_wr_uint; + lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0); + } + + snprintf(var_name, MAX_STRING_SIZE, "state"); + pool_vars[0].data = pl; + pool_vars[0].read_fptr = lprocfs_rd_pool_state; + lprocfs_add_vars(pl->pl_proc_dir, pool_vars, 0); + + pl->pl_stats = lprocfs_alloc_stats(LDLM_POOL_LAST_STAT - + LDLM_POOL_GRANTED_STAT); + if (!pl->pl_stats) + GOTO(out_free_name, rc = -ENOMEM); + + lprocfs_counter_init(pl->pl_stats, LDLM_POOL_GRANTED_STAT, + LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV, + "granted", "locks"); + lprocfs_counter_init(pl->pl_stats, LDLM_POOL_GRANT_RATE_STAT, + LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV, + "grant_rate", "locks/s"); + lprocfs_counter_init(pl->pl_stats, LDLM_POOL_CANCEL_RATE_STAT, + LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV, + "cancel_rate", "locks/s"); + lprocfs_counter_init(pl->pl_stats, LDLM_POOL_GRANT_PLAN_STAT, + LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV, + "grant_plan", "locks/s"); + lprocfs_counter_init(pl->pl_stats, LDLM_POOL_SLV_STAT, + LPROCFS_CNTR_AVGMINMAX | LPROCFS_CNTR_STDDEV, + "slv", "slv"); + lprocfs_register_stats(pl->pl_proc_dir, "stats", pl->pl_stats); + + EXIT; +out_free_name: + OBD_FREE(var_name, MAX_STRING_SIZE + 1); + return rc; +} + +static void ldlm_pool_proc_fini(struct ldlm_pool *pl) +{ + if (pl->pl_stats != NULL) { + lprocfs_free_stats(&pl->pl_stats); + pl->pl_stats = NULL; + } + if (pl->pl_proc_dir != NULL) { + lprocfs_remove(&pl->pl_proc_dir); + pl->pl_proc_dir = NULL; + } +} +#else /* !__KERNEL__*/ +#define ldlm_pool_proc_init(pl) (0) +#define ldlm_pool_proc_fini(pl) while (0) {} +#endif + +int ldlm_pool_init(struct ldlm_pool *pl, struct ldlm_namespace *ns, + int idx, ldlm_side_t client) +{ + int rc; + ENTRY; + + spin_lock_init(&pl->pl_lock); + atomic_set(&pl->pl_granted, 0); + pl->pl_update_time = cfs_time_current(); + atomic_set(&pl->pl_lock_volume_factor, 1); + + atomic_set(&pl->pl_grant_rate, 0); + atomic_set(&pl->pl_cancel_rate, 0); + atomic_set(&pl->pl_grant_speed, 0); + pl->pl_control = LDLM_POOL_CTL_FULL; + atomic_set(&pl->pl_grant_step, LDLM_POOL_GSP); + atomic_set(&pl->pl_grant_plan, LDLM_POOL_GP(LDLM_POOL_HOST_L)); + + snprintf(pl->pl_name, sizeof(pl->pl_name), "ldlm-pool-%s-%d", + ns->ns_name, idx); + + if (client == LDLM_NAMESPACE_SERVER) { + pl->pl_recalc = ldlm_srv_pool_recalc; + pl->pl_shrink = ldlm_srv_pool_shrink; + ldlm_pool_set_limit(pl, LDLM_POOL_HOST_L); + ldlm_pool_set_slv(pl, ldlm_pool_slv_max(LDLM_POOL_HOST_L)); + } else { + ldlm_pool_set_slv(pl, 1); + ldlm_pool_set_limit(pl, 1); + pl->pl_recalc = ldlm_cli_pool_recalc; + pl->pl_shrink = ldlm_cli_pool_shrink; + } + + rc = ldlm_pool_proc_init(pl); + if (rc) + RETURN(rc); + + CDEBUG(D_DLMTRACE, "Lock pool %s is initialized\n", pl->pl_name); + + RETURN(rc); +} +EXPORT_SYMBOL(ldlm_pool_init); + +void ldlm_pool_fini(struct ldlm_pool *pl) +{ + ENTRY; + ldlm_pool_proc_fini(pl); + pl->pl_recalc = NULL; + pl->pl_shrink = NULL; + EXIT; +} +EXPORT_SYMBOL(ldlm_pool_fini); + +void ldlm_pool_add(struct ldlm_pool *pl, struct ldlm_lock *lock) +{ + ENTRY; + atomic_inc(&pl->pl_granted); + atomic_inc(&pl->pl_grant_rate); + atomic_inc(&pl->pl_grant_speed); + EXIT; +} +EXPORT_SYMBOL(ldlm_pool_add); + +void ldlm_pool_del(struct ldlm_pool *pl, struct ldlm_lock *lock) +{ + ENTRY; + LASSERT(atomic_read(&pl->pl_granted) > 0); + atomic_dec(&pl->pl_granted); + atomic_inc(&pl->pl_cancel_rate); + atomic_dec(&pl->pl_grant_speed); + EXIT; +} +EXPORT_SYMBOL(ldlm_pool_del); + +/* ->pl_lock should be taken. */ +__u64 ldlm_pool_get_slv(struct ldlm_pool *pl) +{ + return pl->pl_server_lock_volume; +} +EXPORT_SYMBOL(ldlm_pool_get_slv); + +/* ->pl_lock should be taken. */ +void ldlm_pool_set_slv(struct ldlm_pool *pl, __u64 slv) +{ + pl->pl_server_lock_volume = slv; +} +EXPORT_SYMBOL(ldlm_pool_set_slv); + +__u32 ldlm_pool_get_limit(struct ldlm_pool *pl) +{ + return atomic_read(&pl->pl_limit); +} +EXPORT_SYMBOL(ldlm_pool_get_limit); + +void ldlm_pool_set_limit(struct ldlm_pool *pl, __u32 limit) +{ + atomic_set(&pl->pl_limit, limit); +} +EXPORT_SYMBOL(ldlm_pool_set_limit); + +/* Server side is only enabled for kernel space for now. */ +#ifdef __KERNEL__ +static int ldlm_pool_granted(struct ldlm_pool *pl) +{ + return atomic_read(&pl->pl_granted); +} + +static struct ptlrpc_thread *ldlm_pools_thread; +static struct shrinker *ldlm_pools_shrinker; +static struct completion ldlm_pools_comp; + +static int ldlm_pools_thread_main(void *arg) +{ + struct ptlrpc_thread *thread = (struct ptlrpc_thread *)arg; + char *t_name = "ldlm_poold"; + ENTRY; + + cfs_daemonize(t_name); + thread->t_flags = SVC_RUNNING; + cfs_waitq_signal(&thread->t_ctl_waitq); + + CDEBUG(D_DLMTRACE, "%s: pool thread starting, process %d\n", + t_name, cfs_curproc_pid()); + + while (1) { + __u32 nr_l = 0, nr_p = 0, l; + struct ldlm_namespace *ns; + struct l_wait_info lwi; + int rc, equal = 0; + + /* Check all namespaces. */ + mutex_down(&ldlm_namespace_lock); + list_for_each_entry(ns, &ldlm_namespace_list, ns_list_chain) { + if (ns->ns_appetite != LDLM_NAMESPACE_MODEST) + continue; + + if (ns->ns_client == LDLM_NAMESPACE_SERVER) { + l = ldlm_pool_granted(&ns->ns_pool); + if (l == 0) + l = 1; + + /* Set the modest pools limit equal to + * their avg granted locks + 5%. */ + l += dru(l * LDLM_POOLS_MODEST_MARGIN, 100); + ldlm_pool_setup(&ns->ns_pool, l); + nr_l += l; + nr_p++; + } + + /* After setup is done - recalc the pool. */ + rc = ldlm_pool_recalc(&ns->ns_pool); + if (rc) + CERROR("%s: pool recalculation error " + "%d\n", ns->ns_pool.pl_name, rc); + } + + if (nr_l >= 2 * (LDLM_POOL_HOST_L / 3)) { + CWARN("Modest pools eat out 2/3 of locks limit. %d of %lu. " + "Upgrade server!\n", nr_l, LDLM_POOL_HOST_L); + equal = 1; + } + + list_for_each_entry(ns, &ldlm_namespace_list, ns_list_chain) { + if (!equal && ns->ns_appetite != LDLM_NAMESPACE_GREEDY) + continue; + + if (ns->ns_client == LDLM_NAMESPACE_SERVER) { + if (equal) { + /* In the case 2/3 locks are eaten out by + * modest pools, we re-setup equal limit + * for _all_ pools. */ + l = LDLM_POOL_HOST_L / + atomic_read(&ldlm_srv_namespace_nr); + } else { + /* All the rest of greedy pools will have + * all locks in equal parts.*/ + l = (LDLM_POOL_HOST_L - nr_l) / + (atomic_read(&ldlm_srv_namespace_nr) - + nr_p); + } + ldlm_pool_setup(&ns->ns_pool, l); + } + + /* After setup is done - recalc the pool. */ + rc = ldlm_pool_recalc(&ns->ns_pool); + if (rc) + CERROR("%s: pool recalculation error " + "%d\n", ns->ns_pool.pl_name, rc); + } + mutex_up(&ldlm_namespace_lock); + + /* Wait until the next check time, or until we're + * stopped. */ + lwi = LWI_TIMEOUT(cfs_time_seconds(LDLM_POOLS_THREAD_PERIOD), + NULL, NULL); + l_wait_event(thread->t_ctl_waitq, (thread->t_flags & + (SVC_STOPPING|SVC_EVENT)), + &lwi); + + if (thread->t_flags & SVC_STOPPING) { + thread->t_flags &= ~SVC_STOPPING; + break; + } else if (thread->t_flags & SVC_EVENT) { + thread->t_flags &= ~SVC_EVENT; + } + } + + thread->t_flags = SVC_STOPPED; + cfs_waitq_signal(&thread->t_ctl_waitq); + + CDEBUG(D_DLMTRACE, "%s: pool thread exiting, process %d\n", + t_name, cfs_curproc_pid()); + + complete_and_exit(&ldlm_pools_comp, 0); +} + +static int ldlm_pools_thread_start(ldlm_side_t client) +{ + struct l_wait_info lwi = { 0 }; + int rc; + ENTRY; + + if (ldlm_pools_thread != NULL) + RETURN(-EALREADY); + + OBD_ALLOC_PTR(ldlm_pools_thread); + if (ldlm_pools_thread == NULL) + RETURN(-ENOMEM); + + ldlm_pools_thread->t_id = client; + init_completion(&ldlm_pools_comp); + cfs_waitq_init(&ldlm_pools_thread->t_ctl_waitq); + + /* CLONE_VM and CLONE_FILES just avoid a needless copy, because we + * just drop the VM and FILES in ptlrpc_daemonize() right away. */ + rc = cfs_kernel_thread(ldlm_pools_thread_main, ldlm_pools_thread, + CLONE_VM | CLONE_FILES); + if (rc < 0) { + CERROR("Can't start pool thread, error %d\n", + rc); + OBD_FREE(ldlm_pools_thread, sizeof(*ldlm_pools_thread)); + ldlm_pools_thread = NULL; + RETURN(rc); + } + l_wait_event(ldlm_pools_thread->t_ctl_waitq, + (ldlm_pools_thread->t_flags & SVC_RUNNING), &lwi); + RETURN(0); +} + +static void ldlm_pools_thread_stop(void) +{ + ENTRY; + + if (ldlm_pools_thread == NULL) { + EXIT; + return; + } + + ldlm_pools_thread->t_flags = SVC_STOPPING; + cfs_waitq_signal(&ldlm_pools_thread->t_ctl_waitq); + + /* Make sure that pools thread is finished before freeing @thread. + * This fixes possible race and oops due to accessing freed memory + * in pools thread. */ + wait_for_completion(&ldlm_pools_comp); + OBD_FREE_PTR(ldlm_pools_thread); + ldlm_pools_thread = NULL; + EXIT; +} + +int ldlm_pools_init(ldlm_side_t client) +{ + int rc; + ENTRY; + + rc = ldlm_pools_thread_start(client); + if (rc == 0) + ldlm_pools_shrinker = set_shrinker(DEFAULT_SEEKS, + ldlm_pools_shrink); + RETURN(rc); +} +EXPORT_SYMBOL(ldlm_pools_init); + +void ldlm_pools_fini(void) +{ + if (ldlm_pools_shrinker != NULL) { + remove_shrinker(ldlm_pools_shrinker); + ldlm_pools_shrinker = NULL; + } + ldlm_pools_thread_stop(); +} +EXPORT_SYMBOL(ldlm_pools_fini); + +void ldlm_pools_wakeup(void) +{ + ENTRY; + if (ldlm_pools_thread == NULL) + return; + ldlm_pools_thread->t_flags |= SVC_EVENT; + cfs_waitq_signal(&ldlm_pools_thread->t_ctl_waitq); + EXIT; +} +EXPORT_SYMBOL(ldlm_pools_wakeup); + +/* Cancel @nr locks from all namespaces (if possible). Returns number of + * cached locks after shrink is finished. All namespaces are asked to + * cancel approximately equal amount of locks. */ +int ldlm_pools_shrink(int nr, unsigned int gfp_mask) +{ + struct ldlm_namespace *ns; + int total = 0, cached = 0; + + if (nr != 0 && !(gfp_mask & __GFP_FS)) + return -1; + + CDEBUG(D_DLMTRACE, "request to shrink %d locks from all pools\n", + nr); + mutex_down(&ldlm_namespace_lock); + list_for_each_entry(ns, &ldlm_namespace_list, ns_list_chain) + total += ldlm_pool_granted(&ns->ns_pool); + + if (nr == 0) { + mutex_up(&ldlm_namespace_lock); + return total; + } + + /* Check all namespaces. */ + list_for_each_entry(ns, &ldlm_namespace_list, ns_list_chain) { + struct ldlm_pool *pl = &ns->ns_pool; + int cancel, nr_locks; + + nr_locks = ldlm_pool_granted(&ns->ns_pool); + cancel = 1 + nr_locks * nr / total; + cancel = ldlm_pool_shrink(pl, cancel, gfp_mask); + cached += ldlm_pool_granted(&ns->ns_pool); + } + mutex_up(&ldlm_namespace_lock); + return cached; +} +EXPORT_SYMBOL(ldlm_pools_shrink); +#endif /* __KERNEL__ */ + +#else /* !HAVE_LRU_RESIZE_SUPPORT */ +int ldlm_pool_setup(struct ldlm_pool *pl, __u32 limit) +{ + return 0; +} +EXPORT_SYMBOL(ldlm_pool_setup); + +int ldlm_pool_recalc(struct ldlm_pool *pl) +{ + return 0; +} +EXPORT_SYMBOL(ldlm_pool_recalc); + +int ldlm_pool_shrink(struct ldlm_pool *pl, + int nr, unsigned int gfp_mask) +{ + return 0; +} +EXPORT_SYMBOL(ldlm_pool_shrink); + +int ldlm_pool_init(struct ldlm_pool *pl, struct ldlm_namespace *ns, + int idx, ldlm_side_t client) +{ + return 0; +} +EXPORT_SYMBOL(ldlm_pool_init); + +void ldlm_pool_fini(struct ldlm_pool *pl) +{ + return; +} +EXPORT_SYMBOL(ldlm_pool_fini); + +void ldlm_pool_add(struct ldlm_pool *pl, struct ldlm_lock *lock) +{ + return; +} +EXPORT_SYMBOL(ldlm_pool_add); + +void ldlm_pool_del(struct ldlm_pool *pl, struct ldlm_lock *lock) +{ + return; +} +EXPORT_SYMBOL(ldlm_pool_del); + +__u64 ldlm_pool_get_slv(struct ldlm_pool *pl) +{ + return 1; +} +EXPORT_SYMBOL(ldlm_pool_get_slv); + +void ldlm_pool_set_slv(struct ldlm_pool *pl, __u64 slv) +{ + return; +} +EXPORT_SYMBOL(ldlm_pool_set_slv); + +__u32 ldlm_pool_get_limit(struct ldlm_pool *pl) +{ + return 0; +} +EXPORT_SYMBOL(ldlm_pool_get_limit); + +void ldlm_pool_set_limit(struct ldlm_pool *pl, __u32 limit) +{ + return; +} +EXPORT_SYMBOL(ldlm_pool_set_limit); + +int ldlm_pools_init(ldlm_side_t client) +{ + return 0; +} +EXPORT_SYMBOL(ldlm_pools_init); + +void ldlm_pools_fini(void) +{ + return; +} +EXPORT_SYMBOL(ldlm_pools_fini); + +void ldlm_pools_wakeup(void) +{ + return; +} +EXPORT_SYMBOL(ldlm_pools_wakeup); +#endif /* HAVE_LRU_RESIZE_SUPPORT */ diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index d9f5dcf..4dbe099 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -501,14 +501,16 @@ struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp, /* Estimate the amount of free space in the request. */ int avail = ldlm_req_handles_avail(exp, size, bufcount, LDLM_ENQUEUE_CANCEL_OFF); + LASSERT(avail >= count); /* Cancel lru locks here _only_ if the server supports * EARLY_CANCEL. Otherwise we have to send extra CANCEL * rpc right on enqueue, what will make it slower, vs. * asynchronous rpc in blocking thread. */ - count += ldlm_cancel_lru_local(ns, cancels, 1, avail - count, - LDLM_CANCEL_AGED); + count += ldlm_cancel_lru_local(ns, cancels, + exp_connect_lru_resize(exp) ? 0 : 1, + avail - count, LDLM_CANCEL_AGED); size[DLM_LOCKREQ_OFF] = ldlm_request_bufsize(count, LDLM_ENQUEUE); } @@ -921,10 +923,54 @@ out: return sent ? sent : rc; } +static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp) +{ + LASSERT(imp != NULL); + return &imp->imp_obd->obd_namespace->ns_pool; +} + +int ldlm_cli_update_pool(struct ptlrpc_request *req) +{ + struct ldlm_pool *pl; + ENTRY; + + if (!imp_connect_lru_resize(req->rq_import)) + RETURN(0); + + pl = ldlm_imp2pl(req->rq_import); + + spin_lock(&pl->pl_lock); +#ifdef __KERNEL__ + { + __u64 old_slv, fast_slv_change; + + old_slv = ldlm_pool_get_slv(pl); + fast_slv_change = old_slv * LDLM_POOLS_FAST_SLV_CHANGE; + do_div(fast_slv_change, 100); +#endif + pl->pl_update_time = cfs_time_current(); + ldlm_pool_set_slv(pl, lustre_msg_get_slv(req->rq_repmsg)); + ldlm_pool_set_limit(pl, lustre_msg_get_limit(req->rq_repmsg)); +#ifdef __KERNEL__ + /* Wake up pools thread only if SLV has changed more than + * 5% since last update. In this case we want to react asap. + * Otherwise it is no sense to wake up pools as they are + * re-calculated every 1s anyways. */ + if (old_slv > ldlm_pool_get_slv(pl) && + old_slv - ldlm_pool_get_slv(pl) > fast_slv_change) + ldlm_pools_wakeup(); + } +#endif + spin_unlock(&pl->pl_lock); + + RETURN(0); +} +EXPORT_SYMBOL(ldlm_cli_update_pool); + int ldlm_cli_cancel(struct lustre_handle *lockh) { struct ldlm_lock *lock; - CFS_LIST_HEAD(head); + CFS_LIST_HEAD(cancels); int rc = 0; ENTRY; @@ -939,8 +985,8 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) if (rc < 0 || rc == LDLM_FL_LOCAL_ONLY) GOTO(out, rc); - list_add(&lock->l_bl_ast, &head); - rc = ldlm_cli_cancel_req(lock->l_conn_export, &head, 1); + list_add(&lock->l_bl_ast, &cancels); + rc = ldlm_cli_cancel_req(lock->l_conn_export, &cancels, 1); EXIT; out: LDLM_LOCK_PUT(lock); @@ -962,14 +1008,21 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, int count, int max, int flags) { + int rc, added = 0, left, unused; cfs_time_t cur = cfs_time_current(); struct ldlm_lock *lock, *next; - int rc, added = 0, left; ENTRY; spin_lock(&ns->ns_unused_lock); - count += ns->ns_nr_unused - ns->ns_max_unused; + unused = ns->ns_nr_unused; + + if (!ns_connect_lru_resize(ns)) + count += unused - ns->ns_max_unused; + while (!list_empty(&ns->ns_unused_list)) { + struct ldlm_pool *pl; + __u64 slv, lvf, lv; + if (max && added >= max) break; @@ -983,12 +1036,41 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, if (&lock->l_lru == &ns->ns_unused_list) break; - if ((added >= count) && - (!(flags & LDLM_CANCEL_AGED) || - cfs_time_before_64(cur, (__u64)ns->ns_max_age + - lock->l_last_used))) - break; + pl = &ns->ns_pool; + + if (ns_connect_lru_resize(ns)) { + cfs_time_t la; + /* Do not pay attention to slv in case we are asked + * to cancel particular number of locks (via proc) or + * we already scheduled @added locks for canceling. */ + if (count != 0 && added > count) + break; + + /* Calculate lv for every lock. */ + spin_lock(&pl->pl_lock); + slv = ldlm_pool_get_slv(pl); + lvf = atomic_read(&pl->pl_lock_volume_factor); + spin_unlock(&pl->pl_lock); + + la = cfs_duration_sec(cfs_time_sub(cur, + lock->l_last_used)); + if (la == 0) + la = 1; + + /* Stop when slv is not yet come from server or lv is + * smaller than it is. */ + lv = lvf * la * unused; + if (slv == 1 || lv < slv) + break; + } else { + if ((added >= count) && + (!(flags & LDLM_CANCEL_AGED) || + cfs_time_before_64(cur, (__u64)ns->ns_max_age + + lock->l_last_used))) + break; + } + LDLM_LOCK_GET(lock); /* dropped by bl thread */ spin_unlock(&ns->ns_unused_lock); @@ -1032,6 +1114,7 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, unlock_res_and_lock(lock); spin_lock(&ns->ns_unused_lock); added++; + unused--; } spin_unlock(&ns->ns_unused_lock); @@ -1057,7 +1140,6 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, LDLM_LOCK_PUT(lock); added--; } - } RETURN(added); } @@ -1066,7 +1148,7 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels, * in a thread and this function will return after the thread has been * asked to call the callback. when called with LDLM_SYNC the blocking * callback will be performed in this function. */ -int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync) +int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t sync) { CFS_LIST_HEAD(cancels); int count, rc; @@ -1075,7 +1157,7 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync) #ifndef __KERNEL__ sync = LDLM_SYNC; /* force to be sync in user space */ #endif - count = ldlm_cancel_lru_local(ns, &cancels, 0, 0, 0); + count = ldlm_cancel_lru_local(ns, &cancels, nr, 0, 0); if (sync == LDLM_ASYNC) { struct ldlm_lock *lock, *next; list_for_each_entry_safe(lock, next, &cancels, l_bl_ast) { @@ -1092,7 +1174,7 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync) /* If some locks are left in the list in ASYNC mode, or * this is SYNC mode, cancel the list. */ ldlm_cli_cancel_list(&cancels, count, NULL, DLM_LOCKREQ_OFF); - RETURN(0); + RETURN(count); } /* Find and cancel locally unused locks found on resource, matched to the @@ -1220,7 +1302,7 @@ int ldlm_cli_cancel_list(struct list_head *cancels, int count, CERROR("ldlm_cli_cancel_list: %d\n", res); res = count; } - + count -= res; ldlm_lock_list_put(cancels, l_bl_ast, res); } diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index e277ef6..ac082e7 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -36,6 +36,8 @@ cfs_mem_cache_t *ldlm_resource_slab, *ldlm_lock_slab; +atomic_t ldlm_srv_namespace_nr = ATOMIC_INIT(0); +atomic_t ldlm_cli_namespace_nr = ATOMIC_INIT(0); struct semaphore ldlm_namespace_lock; struct list_head ldlm_namespace_list = CFS_LIST_HEAD_INIT(ldlm_namespace_list); cfs_proc_dir_entry_t *ldlm_type_proc_dir = NULL; @@ -111,43 +113,19 @@ void ldlm_proc_cleanup(void) lprocfs_remove(&ldlm_type_proc_dir); } -static int lprocfs_uint_rd(char *page, char **start, off_t off, - int count, int *eof, void *data) -{ - unsigned int *temp = (unsigned int *)data; - return snprintf(page, count, "%u\n", *temp); -} - -#define MAX_STRING_SIZE 128 -static int lprocfs_uint_wr(struct file *file, const char *buffer, - unsigned long count, void *data) -{ - unsigned *p = data; - char dummy[MAX_STRING_SIZE + 1], *end; - unsigned long tmp; - - dummy[MAX_STRING_SIZE] = '\0'; - if (copy_from_user(dummy, buffer, MAX_STRING_SIZE)) - return -EFAULT; - - tmp = simple_strtoul(dummy, &end, 0); - if (dummy == end) - return -EINVAL; - - *p = (unsigned int)tmp; - return count; -} - -static int lprocfs_read_lru_size(char *page, char **start, off_t off, - int count, int *eof, void *data) +static int lprocfs_rd_lru_size(char *page, char **start, off_t off, + int count, int *eof, void *data) { struct ldlm_namespace *ns = data; - return lprocfs_uint_rd(page, start, off, count, eof, - &ns->ns_max_unused); + __u32 *nr = &ns->ns_max_unused; + + if (ns_connect_lru_resize(ns)) + nr = &ns->ns_nr_unused; + return lprocfs_rd_uint(page, start, off, count, eof, nr); } -static int lprocfs_write_lru_size(struct file *file, const char *buffer, - unsigned long count, void *data) +static int lprocfs_wr_lru_size(struct file *file, const char *buffer, + unsigned long count, void *data) { struct ldlm_namespace *ns = data; char dummy[MAX_STRING_SIZE + 1], *end; @@ -161,25 +139,38 @@ static int lprocfs_write_lru_size(struct file *file, const char *buffer, CDEBUG(D_DLMTRACE, "dropping all unused locks from namespace %s\n", ns->ns_name); - tmp = ns->ns_max_unused; - ns->ns_max_unused = 0; - ldlm_cancel_lru(ns, LDLM_SYNC); - ns->ns_max_unused = tmp; + if (ns_connect_lru_resize(ns)) { + /* Try to cancel all @ns_nr_unused locks. */ + ldlm_cancel_lru(ns, ns->ns_nr_unused, LDLM_SYNC); + } else { + tmp = ns->ns_max_unused; + ns->ns_max_unused = 0; + ldlm_cancel_lru(ns, 0, LDLM_SYNC); + ns->ns_max_unused = tmp; + } return count; } tmp = simple_strtoul(dummy, &end, 0); - if (tmp == 0 && *end) { + if (*end) { CERROR("invalid value written\n"); return -EINVAL; } - CDEBUG(D_DLMTRACE, "changing namespace %s max_unused from %u to %u\n", - ns->ns_name, ns->ns_max_unused, (unsigned int)tmp); - ns->ns_max_unused = (unsigned int)tmp; - - ldlm_cancel_lru(ns, LDLM_ASYNC); - + if (ns_connect_lru_resize(ns)) { + if (tmp > ns->ns_nr_unused) + tmp = ns->ns_nr_unused; + tmp = ns->ns_nr_unused - tmp; + + CDEBUG(D_DLMTRACE, "changing namespace %s unused locks from %u to %u\n", + ns->ns_name, ns->ns_nr_unused, (unsigned int)tmp); + ldlm_cancel_lru(ns, (unsigned int)tmp, LDLM_ASYNC); + } else { + CDEBUG(D_DLMTRACE, "changing namespace %s max_unused from %u to %u\n", + ns->ns_name, ns->ns_max_unused, (unsigned int)tmp); + ns->ns_max_unused = (unsigned int)tmp; + ldlm_cancel_lru(ns, 0, LDLM_ASYNC); + } return count; } @@ -210,42 +201,42 @@ void ldlm_proc_namespace(struct ldlm_namespace *ns) snprintf(lock_name, MAX_STRING_SIZE, "%s/lock_unused_count", ns->ns_name); lock_vars[0].data = &ns->ns_nr_unused; - lock_vars[0].read_fptr = lprocfs_uint_rd; + lock_vars[0].read_fptr = lprocfs_rd_uint; lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0); snprintf(lock_name, MAX_STRING_SIZE, "%s/lru_size", ns->ns_name); lock_vars[0].data = ns; - lock_vars[0].read_fptr = lprocfs_read_lru_size; - lock_vars[0].write_fptr = lprocfs_write_lru_size; + lock_vars[0].read_fptr = lprocfs_rd_lru_size; + lock_vars[0].write_fptr = lprocfs_wr_lru_size; lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0); snprintf(lock_name, MAX_STRING_SIZE, "%s/lru_max_age", ns->ns_name); lock_vars[0].data = &ns->ns_max_age; - lock_vars[0].read_fptr = lprocfs_uint_rd; - lock_vars[0].write_fptr = lprocfs_uint_wr; + lock_vars[0].read_fptr = lprocfs_rd_uint; + lock_vars[0].write_fptr = lprocfs_wr_uint; lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0); } else { snprintf(lock_name, MAX_STRING_SIZE, "%s/max_nolock_bytes", ns->ns_name); lock_vars[0].data = &ns->ns_max_nolock_size; - lock_vars[0].read_fptr = lprocfs_uint_rd; - lock_vars[0].write_fptr = lprocfs_uint_wr; + lock_vars[0].read_fptr = lprocfs_rd_uint; + lock_vars[0].write_fptr = lprocfs_wr_uint; lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0); snprintf(lock_name, MAX_STRING_SIZE, "%s/contention_seconds", ns->ns_name); lock_vars[0].data = &ns->ns_contention_time; - lock_vars[0].read_fptr = lprocfs_uint_rd; - lock_vars[0].write_fptr = lprocfs_uint_wr; + lock_vars[0].read_fptr = lprocfs_rd_uint; + lock_vars[0].write_fptr = lprocfs_wr_uint; lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0); snprintf(lock_name, MAX_STRING_SIZE, "%s/contended_locks", ns->ns_name); lock_vars[0].data = &ns->ns_contended_locks; - lock_vars[0].read_fptr = lprocfs_uint_rd; - lock_vars[0].write_fptr = lprocfs_uint_wr; + lock_vars[0].read_fptr = lprocfs_rd_uint; + lock_vars[0].write_fptr = lprocfs_wr_uint; lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0); } } @@ -254,20 +245,27 @@ void ldlm_proc_namespace(struct ldlm_namespace *ns) #define ldlm_proc_namespace(ns) do {} while (0) #endif /* LPROCFS */ -struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client) +static atomic_t *ldlm_namespace_nr(ldlm_side_t client) +{ + return client == LDLM_NAMESPACE_SERVER ? + &ldlm_srv_namespace_nr : &ldlm_cli_namespace_nr; +} + +struct ldlm_namespace *ldlm_namespace_new(char *name, ldlm_side_t client, + ldlm_appetite_t apt) { struct ldlm_namespace *ns = NULL; struct list_head *bucket; - int rc; + int rc, idx; ENTRY; - rc = ldlm_get_ref(); + rc = ldlm_get_ref(client); if (rc) { CERROR("ldlm_get_ref failed: %d\n", rc); RETURN(NULL); } - OBD_ALLOC(ns, sizeof(*ns)); + OBD_ALLOC_PTR(ns); if (!ns) GOTO(out_ref, NULL); @@ -279,6 +277,7 @@ struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client) if (!ns->ns_name) GOTO(out_hash, NULL); + ns->ns_appetite = apt; strcpy(ns->ns_name, name); CFS_INIT_LIST_HEAD(&ns->ns_root_list); @@ -302,19 +301,34 @@ struct ldlm_namespace *ldlm_namespace_new(char *name, __u32 client) ns->ns_max_age = LDLM_DEFAULT_MAX_ALIVE; spin_lock_init(&ns->ns_unused_lock); + ns->ns_connect_flags = 0; mutex_down(&ldlm_namespace_lock); list_add(&ns->ns_list_chain, &ldlm_namespace_list); + idx = atomic_read(ldlm_namespace_nr(client)); + atomic_inc(ldlm_namespace_nr(client)); mutex_up(&ldlm_namespace_lock); + ldlm_proc_namespace(ns); + + rc = ldlm_pool_init(&ns->ns_pool, ns, idx, client); + if (rc) { + CERROR("can't initialize lock pool, rc %d\n", rc); + GOTO(out_del, rc); + } RETURN(ns); +out_del: + mutex_down(&ldlm_namespace_lock); + list_del(&ns->ns_list_chain); + atomic_dec(ldlm_namespace_nr(client)); + mutex_up(&ldlm_namespace_lock); out_hash: POISON(ns->ns_hash, 0x5a, sizeof(*ns->ns_hash) * RES_HASH_SIZE); OBD_VFREE(ns->ns_hash, sizeof(*ns->ns_hash) * RES_HASH_SIZE); out_ns: - OBD_FREE(ns, sizeof(*ns)); + OBD_FREE_PTR(ns); out_ref: - ldlm_put_ref(0); + ldlm_put_ref(client, 0); RETURN(NULL); } @@ -451,6 +465,8 @@ int ldlm_namespace_free_prior(struct ldlm_namespace *ns) mutex_down(&ldlm_namespace_lock); list_del(&ns->ns_list_chain); + atomic_dec(ldlm_namespace_nr(ns->ns_client)); + ldlm_pool_fini(&ns->ns_pool); mutex_up(&ldlm_namespace_lock); /* At shutdown time, don't call the cancellation callback */ @@ -481,6 +497,7 @@ int ldlm_namespace_free_prior(struct ldlm_namespace *ns) int ldlm_namespace_free_post(struct ldlm_namespace *ns, int force) { + ldlm_side_t client; ENTRY; if (!ns) RETURN(ELDLM_OK); @@ -497,13 +514,12 @@ int ldlm_namespace_free_post(struct ldlm_namespace *ns, int force) } } #endif - + client = ns->ns_client; POISON(ns->ns_hash, 0x5a, sizeof(*ns->ns_hash) * RES_HASH_SIZE); OBD_VFREE(ns->ns_hash, sizeof(*ns->ns_hash) * RES_HASH_SIZE); OBD_FREE(ns->ns_name, strlen(ns->ns_name) + 1); - OBD_FREE(ns, sizeof(*ns)); - - ldlm_put_ref(force); + OBD_FREE_PTR(ns); + ldlm_put_ref(client, force); RETURN(ELDLM_OK); } diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 8c618c5..8c8afb5 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -162,6 +162,9 @@ static int client_common_fill_super(struct super_block *sb, data->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_IBITS | OBD_CONNECT_JOIN | OBD_CONNECT_ATTRFID | OBD_CONNECT_NODEVOH | OBD_CONNECT_CANCELSET | OBD_CONNECT_AT; +#ifdef HAVE_LRU_RESIZE_SUPPORT + data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE; +#endif #ifdef CONFIG_FS_POSIX_ACL data->ocd_connect_flags |= OBD_CONNECT_ACL; #endif @@ -263,6 +266,10 @@ static int client_common_fill_super(struct super_block *sb, OBD_CONNECT_REQPORTAL | OBD_CONNECT_BRW_SIZE | OBD_CONNECT_SRVLOCK | OBD_CONNECT_CANCELSET | OBD_CONNECT_AT; +#ifdef HAVE_LRU_RESIZE_SUPPORT + data->ocd_connect_flags |= OBD_CONNECT_LRU_RESIZE; +#endif + CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d " "ocd_grant: %d\n", data->ocd_connect_flags, data->ocd_version, data->ocd_grant); diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index b985e63..e7a1fc6 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -1943,7 +1943,8 @@ static int mds_setup(struct obd_device *obd, obd_count len, void *buf) mds->mds_evict_ost_nids = 1; sprintf(ns_name, "mds-%s", obd->obd_uuid.uuid); - obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER); + obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER, + LDLM_NAMESPACE_GREEDY); if (obd->obd_namespace == NULL) { mds_cleanup(obd); GOTO(err_ops, rc = -ENOMEM); diff --git a/lustre/mgs/mgs_handler.c b/lustre/mgs/mgs_handler.c index e17634d..7fa1198 100644 --- a/lustre/mgs/mgs_handler.c +++ b/lustre/mgs/mgs_handler.c @@ -138,11 +138,10 @@ static int mgs_setup(struct obd_device *obd, obd_count len, void *buf) GOTO(err_put, rc = PTR_ERR(obd->obd_fsops)); /* namespace for mgs llog */ - obd->obd_namespace = ldlm_namespace_new("MGS", LDLM_NAMESPACE_SERVER); - if (obd->obd_namespace == NULL) { - mgs_cleanup(obd); + obd->obd_namespace = ldlm_namespace_new("MGS", LDLM_NAMESPACE_SERVER, + LDLM_NAMESPACE_MODEST); + if (obd->obd_namespace == NULL) GOTO(err_ops, rc = -ENOMEM); - } /* ldlm setup */ ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL, @@ -251,11 +250,11 @@ static int mgs_cleanup(struct obd_device *obd) struct mgs_obd *mgs = &obd->u.mgs; ENTRY; - ping_evictor_stop(); - if (mgs->mgs_sb == NULL) RETURN(0); + ping_evictor_stop(); + ptlrpc_unregister_service(mgs->mgs_service); mgs_cleanup_fsdb_list(obd); diff --git a/lustre/obdclass/lprocfs_status.c b/lustre/obdclass/lprocfs_status.c index 557fef6..c6883e4 100644 --- a/lustre/obdclass/lprocfs_status.c +++ b/lustre/obdclass/lprocfs_status.c @@ -38,6 +38,8 @@ #if defined(LPROCFS) +#define MAX_STRING_SIZE 128 + /* for bug 10866, global variable */ DECLARE_RWSEM(_lprocfs_lock); EXPORT_SYMBOL(_lprocfs_lock); @@ -342,6 +344,31 @@ struct proc_dir_entry *lprocfs_register(const char *name, } /* Generic callbacks */ +int lprocfs_rd_uint(char *page, char **start, off_t off, + int count, int *eof, void *data) +{ + unsigned int *temp = (unsigned int *)data; + return snprintf(page, count, "%u\n", *temp); +} + +int lprocfs_wr_uint(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + unsigned *p = data; + char dummy[MAX_STRING_SIZE + 1], *end; + unsigned long tmp; + + dummy[MAX_STRING_SIZE] = '\0'; + if (copy_from_user(dummy, buffer, MAX_STRING_SIZE)) + return -EFAULT; + + tmp = simple_strtoul(dummy, &end, 0); + if (dummy == end) + return -EINVAL; + + *p = (unsigned int)tmp; + return count; +} int lprocfs_rd_u64(char *page, char **start, off_t off, int count, int *eof, void *data) @@ -360,6 +387,24 @@ int lprocfs_rd_atomic(char *page, char **start, off_t off, return snprintf(page, count, "%d\n", atomic_read(atom)); } +int lprocfs_wr_atomic(struct file *file, const char *buffer, + unsigned long count, void *data) +{ + atomic_t *atm = data; + int val = 0; + int rc; + + rc = lprocfs_write_helper(buffer, count, &val); + if (rc < 0) + return rc; + + if (val <= 0) + return -ERANGE; + + atomic_set(atm, val); + return count; +} + int lprocfs_rd_uuid(char *page, char **start, off_t off, int count, int *eof, void *data) { @@ -653,6 +698,7 @@ static const char *obd_connect_names[] = { "early_lock_cancel", "size_on_mds", "adaptive_timeout", + "lru_resize", NULL }; @@ -1528,6 +1574,9 @@ EXPORT_SYMBOL(lprocfs_exp_cleanup); EXPORT_SYMBOL(lprocfs_rd_u64); EXPORT_SYMBOL(lprocfs_rd_atomic); +EXPORT_SYMBOL(lprocfs_wr_atomic); +EXPORT_SYMBOL(lprocfs_rd_uint); +EXPORT_SYMBOL(lprocfs_wr_uint); EXPORT_SYMBOL(lprocfs_rd_uuid); EXPORT_SYMBOL(lprocfs_rd_name); EXPORT_SYMBOL(lprocfs_rd_fstype); diff --git a/lustre/obdecho/echo.c b/lustre/obdecho/echo.c index 541cf5d..820e529 100644 --- a/lustre/obdecho/echo.c +++ b/lustre/obdecho/echo.c @@ -465,7 +465,8 @@ static int echo_setup(struct obd_device *obd, obd_count len, void *buf) obd->u.echo.eo_lastino = ECHO_INIT_OBJID; obd->obd_namespace = ldlm_namespace_new("echo-tgt", - LDLM_NAMESPACE_SERVER); + LDLM_NAMESPACE_SERVER, + LDLM_NAMESPACE_GREEDY); if (obd->obd_namespace == NULL) { LBUG(); RETURN(-ENOMEM); diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index 423e3d2..ee2c4d0 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -1700,7 +1700,8 @@ int filter_common_setup(struct obd_device *obd, obd_count len, void *buf, filter->fo_fmd_max_age = FILTER_FMD_MAX_AGE_DEFAULT; sprintf(ns_name, "filter-%s", obd->obd_uuid.uuid); - obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER); + obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER, + LDLM_NAMESPACE_GREEDY); if (obd->obd_namespace == NULL) GOTO(err_post, rc = -ENOMEM); obd->obd_namespace->ns_lvbp = obd; diff --git a/lustre/ptlrpc/Makefile.in b/lustre/ptlrpc/Makefile.in index cc625c6..3adcf81 100644 --- a/lustre/ptlrpc/Makefile.in +++ b/lustre/ptlrpc/Makefile.in @@ -9,6 +9,7 @@ ldlm_objs += $(LDLM)ldlm_resource.o $(LDLM)ldlm_lib.o ldlm_objs += $(LDLM)ldlm_plain.o $(LDLM)ldlm_extent.o ldlm_objs += $(LDLM)ldlm_request.o $(LDLM)ldlm_lockd.o ldlm_objs += $(LDLM)ldlm_flock.o $(LDLM)ldlm_inodebits.o +ldlm_objs += $(LDLM)ldlm_pool.o ptlrpc_objs := client.o recover.o connection.o niobuf.o pack_generic.o ptlrpc_objs += events.o ptlrpc_module.o service.o pinger.o recov_thread.o ptlrpc_objs += llog_net.o llog_client.o llog_server.o import.o ptlrpcd.o diff --git a/lustre/ptlrpc/autoMakefile.am b/lustre/ptlrpc/autoMakefile.am index 80f9c47..f10b84d 100644 --- a/lustre/ptlrpc/autoMakefile.am +++ b/lustre/ptlrpc/autoMakefile.am @@ -13,7 +13,8 @@ LDLM_COMM_SOURCES= $(top_srcdir)/lustre/ldlm/l_lock.c \ $(top_srcdir)/lustre/ldlm/ldlm_lockd.c \ $(top_srcdir)/lustre/ldlm/ldlm_internal.h \ $(top_srcdir)/lustre/ldlm/ldlm_inodebits.c \ - $(top_srcdir)/lustre/ldlm/ldlm_flock.c + $(top_srcdir)/lustre/ldlm/ldlm_flock.c \ + $(top_srcdir)/lustre/ldlm/ldlm_pool.c COMMON_SOURCES = client.c recover.c connection.c niobuf.c pack_generic.c \ events.c ptlrpc_module.c service.c pinger.c recov_thread.c llog_net.c \ diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 75b62b9..741dcc6 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -807,19 +807,27 @@ static int after_reply(struct ptlrpc_request *req) } rc = ptlrpc_check_status(req); - - /* Either we've been evicted, or the server has failed for - * some reason. Try to reconnect, and if that fails, punt to the - * upcall. */ - if ((rc == -ENOTCONN) || (rc == -ENODEV)) { - if (req->rq_send_state != LUSTRE_IMP_FULL || - imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) { - RETURN(-ENOTCONN); + if (rc) { + /* Either we've been evicted, or the server has failed for + * some reason. Try to reconnect, and if that fails, punt to + * the upcall. */ + if (rc == -ENOTCONN || rc == -ENODEV) { + if (req->rq_send_state != LUSTRE_IMP_FULL || + imp->imp_obd->obd_no_recov || imp->imp_dlm_fake) { + RETURN(-ENOTCONN); + } + ptlrpc_request_handle_notconn(req); + RETURN(rc); + } + } else { + /* Let's look if server send slv. Do it only for RPC with + * rc == 0. */ + if (imp->imp_obd->obd_namespace) { + /* Disconnect rpc is sent when namespace is already + * destroyed. Let's check this and will not try update + * pool. */ + ldlm_cli_update_pool(req); } - - ptlrpc_request_handle_notconn(req); - - RETURN(rc); } /* Store transno in reqmsg for replay. */ diff --git a/lustre/ptlrpc/import.c b/lustre/ptlrpc/import.c index c5106ba..62c92a4 100644 --- a/lustre/ptlrpc/import.c +++ b/lustre/ptlrpc/import.c @@ -713,7 +713,6 @@ finish: ocd = lustre_swab_repbuf(request, REPLY_REC_OFF, sizeof(*ocd), lustre_swab_connect); - spin_lock(&imp->imp_lock); list_del(&imp->imp_conn_current->oic_item); list_add(&imp->imp_conn_current->oic_item, &imp->imp_conn_list); @@ -746,6 +745,7 @@ finish: GOTO(out, rc = -ENODEV); } exp->exp_connect_flags = ocd->ocd_connect_flags; + imp->imp_obd->obd_self_export->exp_connect_flags = ocd->ocd_connect_flags; class_export_put(exp); obd_import_event(imp->imp_obd, imp, IMP_EVENT_OCD); @@ -787,6 +787,8 @@ finish: ocd->ocd_brw_size >> CFS_PAGE_SHIFT; } + imp->imp_obd->obd_namespace->ns_connect_flags = ocd->ocd_connect_flags; + if ((ocd->ocd_connect_flags & OBD_CONNECT_AT) && (imp->imp_msg_magic == LUSTRE_MSG_MAGIC_V2)) /* We need a per-message support flag, because diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index 83bbe3a..eea15f0 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -1425,6 +1425,102 @@ int lustre_msg_get_status(struct lustre_msg *msg) } } +__u64 lustre_msg_get_slv(struct lustre_msg *msg) +{ + switch (msg->lm_magic) { + case LUSTRE_MSG_MAGIC_V1: + case LUSTRE_MSG_MAGIC_V1_SWABBED: + return 1; + case LUSTRE_MSG_MAGIC_V2: + case LUSTRE_MSG_MAGIC_V2_SWABBED: { + struct ptlrpc_body *pb; + + pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + if (!pb) { + CERROR("invalid msg %p: no ptlrpc body!\n", msg); + return -EINVAL; + } + return pb->pb_slv; + } + default: + CERROR("invalid msg magic %x\n", msg->lm_magic); + return -EINVAL; + } +} + + +void lustre_msg_set_slv(struct lustre_msg *msg, __u64 slv) +{ + switch (msg->lm_magic) { + case LUSTRE_MSG_MAGIC_V1: + case LUSTRE_MSG_MAGIC_V1_SWABBED: + return; + case LUSTRE_MSG_MAGIC_V2: + case LUSTRE_MSG_MAGIC_V2_SWABBED: { + struct ptlrpc_body *pb; + + pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + if (!pb) { + CERROR("invalid msg %p: no ptlrpc body!\n", msg); + return; + } + pb->pb_slv = slv; + return; + } + default: + CERROR("invalid msg magic %x\n", msg->lm_magic); + return; + } +} + +__u32 lustre_msg_get_limit(struct lustre_msg *msg) +{ + switch (msg->lm_magic) { + case LUSTRE_MSG_MAGIC_V1: + case LUSTRE_MSG_MAGIC_V1_SWABBED: + return 1; + case LUSTRE_MSG_MAGIC_V2: + case LUSTRE_MSG_MAGIC_V2_SWABBED: { + struct ptlrpc_body *pb; + + pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + if (!pb) { + CERROR("invalid msg %p: no ptlrpc body!\n", msg); + return -EINVAL; + } + return pb->pb_limit; + } + default: + CERROR("invalid msg magic %x\n", msg->lm_magic); + return -EINVAL; + } +} + + +void lustre_msg_set_limit(struct lustre_msg *msg, __u64 limit) +{ + switch (msg->lm_magic) { + case LUSTRE_MSG_MAGIC_V1: + case LUSTRE_MSG_MAGIC_V1_SWABBED: + return; + case LUSTRE_MSG_MAGIC_V2: + case LUSTRE_MSG_MAGIC_V2_SWABBED: { + struct ptlrpc_body *pb; + + pb = lustre_msg_buf_v2(msg, MSG_PTLRPC_BODY_OFF, sizeof(*pb)); + if (!pb) { + CERROR("invalid msg %p: no ptlrpc body!\n", msg); + return; + } + pb->pb_limit = limit; + return; + } + default: + CERROR("invalid msg magic %x\n", msg->lm_magic); + return; + } +} + __u32 lustre_msg_get_conn_cnt(struct lustre_msg *msg) { switch (msg->lm_magic) { @@ -1775,9 +1871,8 @@ void lustre_swab_ptlrpc_body(struct ptlrpc_body *b) __swab32s (&b->pb_conn_cnt); __swab32s (&b->pb_timeout); __swab32s (&b->pb_service_time); - CLASSERT(offsetof(typeof(*b), pb_padding_1) != 0); - CLASSERT(offsetof(typeof(*b), pb_padding_2) != 0); - CLASSERT(offsetof(typeof(*b), pb_padding_3) != 0); + __swab64s (&b->pb_slv); + __swab32s (&b->pb_limit); } void lustre_swab_connect(struct obd_connect_data *ocd) diff --git a/lustre/ptlrpc/pinger.c b/lustre/ptlrpc/pinger.c index 31c43b9..6f49146 100644 --- a/lustre/ptlrpc/pinger.c +++ b/lustre/ptlrpc/pinger.c @@ -45,7 +45,8 @@ int ptlrpc_ping(struct obd_import *imp) int rc = 0; ENTRY; - req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING, 1, NULL, NULL); + req = ptlrpc_prep_req(imp, LUSTRE_OBD_VERSION, OBD_PING, + 1, NULL, NULL); if (req) { DEBUG_REQ(D_INFO, req, "pinging %s->%s", imp->imp_obd->obd_uuid.uuid, diff --git a/lustre/ptlrpc/ptlrpc_module.c b/lustre/ptlrpc/ptlrpc_module.c index ba4c379..66d51b9 100644 --- a/lustre/ptlrpc/ptlrpc_module.c +++ b/lustre/ptlrpc/ptlrpc_module.c @@ -231,6 +231,10 @@ EXPORT_SYMBOL(lustre_msg_get_last_xid); EXPORT_SYMBOL(lustre_msg_get_last_committed); EXPORT_SYMBOL(lustre_msg_get_transno); EXPORT_SYMBOL(lustre_msg_get_status); +EXPORT_SYMBOL(lustre_msg_get_slv); +EXPORT_SYMBOL(lustre_msg_get_limit); +EXPORT_SYMBOL(lustre_msg_set_slv); +EXPORT_SYMBOL(lustre_msg_set_limit); EXPORT_SYMBOL(lustre_msg_get_conn_cnt); EXPORT_SYMBOL(lustre_msg_is_v1); EXPORT_SYMBOL(lustre_msg_get_magic); diff --git a/lustre/ptlrpc/wiretest.c b/lustre/ptlrpc/wiretest.c index f9ca561..60faf24 100644 --- a/lustre/ptlrpc/wiretest.c +++ b/lustre/ptlrpc/wiretest.c @@ -12,8 +12,8 @@ void lustre_assert_wire_constants(void) { /* Wire protocol assertions generated by 'wirecheck' * (make -C lustre/utils newwirecheck) - * running on Linux pancake 2.6.18-skas3-v9-pre9 #1 Tue Feb 20 10:37:58 PST 2007 i686 i686 i3 - * with gcc version 3.4.4 */ + * running on Linux hideous 2.6.9-prep.qp3.5.34.4qsnet #3 Mon Aug 13 08:38:55 EEST 2007 i686 + * with gcc version 3.4.6 20060404 (Red Hat 3.4.6-3) */ /* Constants... */ @@ -381,18 +381,14 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct ptlrpc_body, pb_service_time)); LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_service_time) == 4, " found %lld\n", (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_service_time)); - LASSERTF((int)offsetof(struct ptlrpc_body, pb_padding_1) == 76, " found %lld\n", - (long long)(int)offsetof(struct ptlrpc_body, pb_padding_1)); - LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_padding_1) == 4, " found %lld\n", - (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_padding_1)); - LASSERTF((int)offsetof(struct ptlrpc_body, pb_padding_2) == 80, " found %lld\n", - (long long)(int)offsetof(struct ptlrpc_body, pb_padding_2)); - LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_padding_2) == 4, " found %lld\n", - (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_padding_2)); - LASSERTF((int)offsetof(struct ptlrpc_body, pb_padding_3) == 84, " found %lld\n", - (long long)(int)offsetof(struct ptlrpc_body, pb_padding_3)); - LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_padding_3) == 4, " found %lld\n", - (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_padding_3)); + LASSERTF((int)offsetof(struct ptlrpc_body, pb_slv) == 80, " found %lld\n", + (long long)(int)offsetof(struct ptlrpc_body, pb_slv)); + LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_slv) == 8, " found %lld\n", + (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_slv)); + LASSERTF((int)offsetof(struct ptlrpc_body, pb_limit) == 76, " found %lld\n", + (long long)(int)offsetof(struct ptlrpc_body, pb_limit)); + LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_limit) == 4, " found %lld\n", + (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_limit)); /* Checks for struct obd_connect_data */ LASSERTF((int)sizeof(struct obd_connect_data) == 72, " found %lld\n", @@ -469,6 +465,7 @@ void lustre_assert_wire_constants(void) CLASSERT(OBD_CONNECT_CANCELSET == 0x400000ULL); CLASSERT(OBD_CONNECT_SOM == 0x00800000ULL); CLASSERT(OBD_CONNECT_AT == 0x01000000ULL); + CLASSERT(OBD_CONNECT_LRU_RESIZE == 0x02000000ULL); /* Checks for struct obdo */ LASSERTF((int)sizeof(struct obdo) == 208, " found %lld\n", @@ -2107,4 +2104,3 @@ void lustre_assert_wire_constants(void) LASSERTF((int)sizeof(((struct lustre_disk_data *)0)->ldd_params) == 4096, " found %lld\n", (long long)(int)sizeof(((struct lustre_disk_data *)0)->ldd_params)); } - diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index fb4b302..1aaafeb 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -3919,7 +3919,57 @@ test_119b() # bug 11737 } run_test 119b "Sparse directIO read must return actual read amount" +LDLM_POOL_CTL_RECALC=1 +LDLM_POOL_CTL_SHRINK=2 + +disable_pool_recalc() { + NSDIR=`find $LPROC/ldlm/namespaces | grep $1 | head -1` + if test -f $NSDIR/pool/control; then + NS=`basename $NSDIR` + echo "disable pool recalc for $NS pool" + CONTROL=`cat $NSDIR/pool/control` + CONTROL=$((CONTROL & ~LDLM_POOL_CTL_RECALC)) + echo "$CONTROL" > $NSDIR/pool/control + fi +} + +enable_pool_recalc() { + NSDIR=`find $LPROC/ldlm/namespaces | grep $1 | head -1` + if test -f $NSDIR/pool/control; then + NS=`basename $NSDIR` + echo "enable pool recalc $NS pool" + CONTROL=`cat $NSDIR/pool/control` + CONTROL=$((CONTROL | LDLM_POOL_CTL_RECALC)) + echo "$CONTROL" > $NSDIR/pool/control + fi +} + +disable_pool_shrink() { + NSDIR=`find $LPROC/ldlm/namespaces | grep $1 | head -1` + if test -f $NSDIR/pool/control; then + NS=`basename $NSDIR` + echo "disable pool shrink for $NS pool" + CONTROL=`cat $NSDIR/pool/control` + CONTROL=$((CONTROL & ~LDLM_POOL_CTL_SHRINK)) + echo "$CONTROL" > $NSDIR/pool/control + fi +} + +enable_pool_shrink() { + NSDIR=`find $LPROC/ldlm/namespaces | grep $1 | head -1` + if test -f $NSDIR/pool/control; then + NS=`basename $NSDIR` + echo "enable pool shrink for $NS pool" + CONTROL=`cat $NSDIR/pool/control` + CONTROL=$((CONTROL | LDLM_POOL_CTL_SHRINK)) + echo "$CONTROL" > $NSDIR/pool/control + fi +} + test_120a() { + disable_pool_recalc mdc + disable_pool_shrink mdc + disable_pool_shrink "mds-$FSNAME" mkdir $DIR/$tdir cancel_lru_locks mdc stat $DIR/$tdir > /dev/null @@ -3934,6 +3984,9 @@ test_120a() { run_test 120a "Early Lock Cancel: mkdir test ===================" test_120b() { + disable_pool_recalc mdc + disable_pool_shrink mdc + disable_pool_shrink mds-lustre mkdir $DIR/$tdir cancel_lru_locks mdc stat $DIR/$tdir > /dev/null @@ -3948,6 +4001,9 @@ test_120b() { run_test 120b "Early Lock Cancel: create test ==================" test_120c() { + disable_pool_recalc mdc + disable_pool_shrink mdc + disable_pool_shrink "mds-$FSNAME" mkdir -p $DIR/$tdir/d1 $DIR/$tdir/d2 touch $DIR/$tdir/d1/f1 cancel_lru_locks mdc @@ -3963,6 +4019,9 @@ test_120c() { run_test 120c "Early Lock Cancel: link test ====================" test_120d() { + disable_pool_recalc mdc + disable_pool_shrink mdc + disable_pool_shrink "mds-$FSNAME" touch $DIR/$tdir cancel_lru_locks mdc stat $DIR/$tdir > /dev/null @@ -3977,6 +4036,9 @@ test_120d() { run_test 120d "Early Lock Cancel: setattr test =================" test_120e() { + disable_pool_recalc mdc + disable_pool_shrink mdc + disable_pool_shrink "mds-$FSNAME" mkdir $DIR/$tdir dd if=/dev/zero of=$DIR/$tdir/f1 count=1 cancel_lru_locks mdc @@ -3994,6 +4056,9 @@ test_120e() { run_test 120e "Early Lock Cancel: unlink test ==================" test_120f() { + disable_pool_recalc mdc + disable_pool_shrink mdc + disable_pool_shrink "mds-$FSNAME" mkdir -p $DIR/$tdir/d1 $DIR/$tdir/d2 dd if=/dev/zero of=$DIR/$tdir/d1/f1 count=1 dd if=/dev/zero of=$DIR/$tdir/d2/f2 count=1 @@ -4013,6 +4078,9 @@ test_120f() { run_test 120f "Early Lock Cancel: rename test ==================" test_120g() { + disable_pool_recalc mdc + disable_pool_shrink mdc + disable_pool_shrink "mds-$FSNAME" count=10000 echo create $count files mkdir $DIR/$tdir @@ -4118,6 +4186,51 @@ test_123() # statahead(bug 11401) } run_test 123 "verify statahead work" +test_124() { + NSDIR=`find $LPROC/ldlm/namespaces | grep mdc | head -1` + + if ! test -f $NSDIR/pool/stats; then + skip "lru resize is not enabled!" + return + fi + + enable_pool_recalc mdc + disable_pool_shrink "mds-$FSNAME" + disable_pool_shrink mdc + + LIMIT=`cat $NSDIR/pool/limit` + LIMIT=$(($LIMIT+$LIMIT*5/100)) + mkdir $DIR/$tdir + log "create $LIMIT files at $DIR/$tdir" + createmany -o $DIR/$tdir/f $LIMIT + ls -la $DIR/$tdir + + LRU_SIZE_B=`cat $NSDIR/lru_size` + log "created $LRU_SIZE_B locks" + + # locks should live 10h on clients at max. Thus, to make them expire in 2 min + # we made lock_volume_factor = (10h * 60m) / 2m == 300, so that, to have all + # locks expired in 2 min we need to speed things up by factor 300 + log "make client drop locks 300 times faster so that 2m wait is enough" + echo "300" > $NSDIR/pool/lock_volume_factor + log "sleep for 2m" + sleep 2m + LRU_SIZE_A=`cat $NSDIR/lru_size` + echo "1" > $NSDIR/pool/lock_volume_factor + + [ $LRU_SIZE_B -gt $LRU_SIZE_A ] || { + error "No locks dropped in 2m. LRU size: $LRU_SIZE_A" + enable_pool_shrink mdc + return + } + + log "Dropped "$((LRU_SIZE_B-LRU_SIZE_A))" locks in 2m" + enable_pool_shrink mdc + log "unlink $LIMIT files at $DIR/$tdir" + unlinkmany $DIR/$tdir/f $LIMIT > /dev/null 2>&1 +} +run_test 124 "lru resize =======================================" + TMPDIR=$OLDTMPDIR TMP=$OLDTMP HOME=$OLDHOME diff --git a/lustre/utils/lctl.c b/lustre/utils/lctl.c index 3d9fbc0..9a70047 100644 --- a/lustre/utils/lctl.c +++ b/lustre/utils/lctl.c @@ -23,8 +23,6 @@ * */ - - #include #include #include diff --git a/lustre/utils/wirecheck.c b/lustre/utils/wirecheck.c index a843db8..3829894 100644 --- a/lustre/utils/wirecheck.c +++ b/lustre/utils/wirecheck.c @@ -135,9 +135,8 @@ check_ptlrpc_body(void) CHECK_MEMBER(ptlrpc_body, pb_conn_cnt); CHECK_MEMBER(ptlrpc_body, pb_timeout); CHECK_MEMBER(ptlrpc_body, pb_service_time); - CHECK_MEMBER(ptlrpc_body, pb_padding_1); - CHECK_MEMBER(ptlrpc_body, pb_padding_2); - CHECK_MEMBER(ptlrpc_body, pb_padding_3); + CHECK_MEMBER(ptlrpc_body, pb_slv); + CHECK_MEMBER(ptlrpc_body, pb_limit); } static void check_obd_connect_data(void) @@ -181,6 +180,7 @@ static void check_obd_connect_data(void) CHECK_CDEFINE(OBD_CONNECT_CANCELSET); CHECK_CDEFINE(OBD_CONNECT_SOM); CHECK_CDEFINE(OBD_CONNECT_AT); + CHECK_CDEFINE(OBD_CONNECT_LRU_RESIZE); } static void diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c index 992292c..51379ff 100644 --- a/lustre/utils/wiretest.c +++ b/lustre/utils/wiretest.c @@ -28,8 +28,8 @@ void lustre_assert_wire_constants(void) { /* Wire protocol assertions generated by 'wirecheck' * (make -C lustre/utils newwirecheck) - * running on Linux pancake 2.6.18-skas3-v9-pre9 #1 Tue Feb 20 10:37:58 PST 2007 i686 i686 i3 - * with gcc version 3.4.4 */ + * running on Linux hideous 2.6.9-prep.qp3.5.34.4qsnet #3 Mon Aug 13 08:38:55 EEST 2007 i686 + * with gcc version 3.4.6 20060404 (Red Hat 3.4.6-3) */ /* Constants... */ @@ -397,18 +397,14 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct ptlrpc_body, pb_service_time)); LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_service_time) == 4, " found %lld\n", (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_service_time)); - LASSERTF((int)offsetof(struct ptlrpc_body, pb_padding_1) == 76, " found %lld\n", - (long long)(int)offsetof(struct ptlrpc_body, pb_padding_1)); - LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_padding_1) == 4, " found %lld\n", - (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_padding_1)); - LASSERTF((int)offsetof(struct ptlrpc_body, pb_padding_2) == 80, " found %lld\n", - (long long)(int)offsetof(struct ptlrpc_body, pb_padding_2)); - LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_padding_2) == 4, " found %lld\n", - (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_padding_2)); - LASSERTF((int)offsetof(struct ptlrpc_body, pb_padding_3) == 84, " found %lld\n", - (long long)(int)offsetof(struct ptlrpc_body, pb_padding_3)); - LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_padding_3) == 4, " found %lld\n", - (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_padding_3)); + LASSERTF((int)offsetof(struct ptlrpc_body, pb_slv) == 80, " found %lld\n", + (long long)(int)offsetof(struct ptlrpc_body, pb_slv)); + LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_slv) == 8, " found %lld\n", + (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_slv)); + LASSERTF((int)offsetof(struct ptlrpc_body, pb_limit) == 76, " found %lld\n", + (long long)(int)offsetof(struct ptlrpc_body, pb_limit)); + LASSERTF((int)sizeof(((struct ptlrpc_body *)0)->pb_limit) == 4, " found %lld\n", + (long long)(int)sizeof(((struct ptlrpc_body *)0)->pb_limit)); /* Checks for struct obd_connect_data */ LASSERTF((int)sizeof(struct obd_connect_data) == 72, " found %lld\n", @@ -485,6 +481,7 @@ void lustre_assert_wire_constants(void) CLASSERT(OBD_CONNECT_CANCELSET == 0x400000ULL); CLASSERT(OBD_CONNECT_SOM == 0x00800000ULL); CLASSERT(OBD_CONNECT_AT == 0x01000000ULL); + CLASSERT(OBD_CONNECT_LRU_RESIZE == 0x02000000ULL); /* Checks for struct obdo */ LASSERTF((int)sizeof(struct obdo) == 208, " found %lld\n", @@ -2123,4 +2120,3 @@ void lustre_assert_wire_constants(void) LASSERTF((int)sizeof(((struct lustre_disk_data *)0)->ldd_params) == 4096, " found %lld\n", (long long)(int)sizeof(((struct lustre_disk_data *)0)->ldd_params)); } - -- 1.8.3.1