#include <lustre_param.h>
#include <lustre_fid.h>
#include <obd_class.h>
+#include <obd.h>
+#include <lustre_net.h>
#include "osc_internal.h"
#include "osc_cl_internal.h"
+atomic_t osc_pool_req_count;
+unsigned int osc_reqpool_maxreqcount;
+struct ptlrpc_request_pool *osc_rq_pool;
+
+/* max memory used for request pool, unit is MB */
+static unsigned int osc_reqpool_mem_max = 5;
+module_param(osc_reqpool_mem_max, uint, 0444);
+
struct osc_brw_async_args {
struct obdo *aa_oa;
int aa_requested_nob;
struct osc_enqueue_args {
struct obd_export *oa_exp;
- ldlm_type_t oa_type;
- ldlm_mode_t oa_mode;
+ enum ldlm_type oa_type;
+ enum ldlm_mode oa_mode;
__u64 *oa_flags;
osc_enqueue_upcall_f oa_upcall;
void *oa_cookie;
* locks added to @cancels list. */
static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
struct list_head *cancels,
- ldlm_mode_t mode, __u64 lock_flags)
+ enum ldlm_mode mode, __u64 lock_flags)
{
- struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
- struct ldlm_res_id res_id;
- struct ldlm_resource *res;
- int count;
- ENTRY;
+ struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
+ struct ldlm_res_id res_id;
+ struct ldlm_resource *res;
+ int count;
+ ENTRY;
/* Return, i.e. cancel nothing, only if ELC is supported (flag in
* export) but disabled through procfs (flag in NS).
oa->o_undirty = 0;
} else if (unlikely(atomic_long_read(&obd_dirty_pages) -
atomic_long_read(&obd_dirty_transit_pages) >
- (obd_max_dirty_pages + 1))) {
+ (long)(obd_max_dirty_pages + 1))) {
/* The atomic_read() allowing the atomic_inc() are
* not covered by a lock thus they may safely race and trip
* this CERROR() unless we add in a small fudge factor (+1). */
- CERROR("%s: dirty %ld - %ld > system dirty_max %lu\n",
- cli->cl_import->imp_obd->obd_name,
- atomic_long_read(&obd_dirty_pages),
+ CERROR("%s: dirty %ld - %ld > system dirty_max %ld\n",
+ cli_name(cli), atomic_long_read(&obd_dirty_pages),
atomic_long_read(&obd_dirty_transit_pages),
obd_max_dirty_pages);
oa->o_undirty = 0;
static int osc_add_shrink_grant(struct client_obd *client)
{
- int rc;
+ int rc;
- rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
- TIMEOUT_GRANT,
- osc_grant_shrink_grant_cb, NULL,
- &client->cl_grant_shrink_list);
- if (rc) {
- CERROR("add grant client %s error %d\n",
- client->cl_import->imp_obd->obd_name, rc);
- return rc;
- }
- CDEBUG(D_CACHE, "add grant client %s \n",
- client->cl_import->imp_obd->obd_name);
- osc_update_next_shrink(client);
- return 0;
+ rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
+ TIMEOUT_GRANT,
+ osc_grant_shrink_grant_cb, NULL,
+ &client->cl_grant_shrink_list);
+ if (rc) {
+ CERROR("add grant client %s error %d\n", cli_name(client), rc);
+ return rc;
+ }
+ CDEBUG(D_CACHE, "add grant client %s\n", cli_name(client));
+ osc_update_next_shrink(client);
+ return 0;
}
static int osc_del_shrink_grant(struct client_obd *client)
if (cli->cl_avail_grant < 0) {
CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
- cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
+ cli_name(cli), cli->cl_avail_grant,
ocd->ocd_grant, cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
/* workaround for servers which do not have the patch from
* LU-2679 */
spin_unlock(&cli->cl_loi_list_lock);
CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
- "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
- cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
+ "chunk bits: %d.\n", cli_name(cli), cli->cl_avail_grant,
+ cli->cl_lost_grant, cli->cl_chunkbits);
if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
list_empty(&cli->cl_grant_shrink_list))
if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
RETURN(-EINVAL); /* Fatal */
- if ((cmd & OBD_BRW_WRITE) != 0) {
- opc = OST_WRITE;
- req = ptlrpc_request_alloc_pool(cli->cl_import,
- cli->cl_import->imp_rq_pool,
- &RQF_OST_BRW_WRITE);
- } else {
- opc = OST_READ;
- req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
- }
+ if ((cmd & OBD_BRW_WRITE) != 0) {
+ opc = OST_WRITE;
+ req = ptlrpc_request_alloc_pool(cli->cl_import,
+ osc_rq_pool,
+ &RQF_OST_BRW_WRITE);
+ } else {
+ opc = OST_READ;
+ req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
+ }
if (req == NULL)
RETURN(-ENOMEM);
static int osc_enqueue_fini(struct ptlrpc_request *req,
osc_enqueue_upcall_f upcall, void *cookie,
- struct lustre_handle *lockh, ldlm_mode_t mode,
+ struct lustre_handle *lockh, enum ldlm_mode mode,
__u64 *flags, int agl, int errcode)
{
bool intent = *flags & LDLM_FL_HAS_INTENT;
}
static int osc_enqueue_interpret(const struct lu_env *env,
- struct ptlrpc_request *req,
- struct osc_enqueue_args *aa, int rc)
+ struct ptlrpc_request *req,
+ struct osc_enqueue_args *aa, int rc)
{
struct ldlm_lock *lock;
struct lustre_handle *lockh = &aa->oa_lockh;
- ldlm_mode_t mode = aa->oa_mode;
+ enum ldlm_mode mode = aa->oa_mode;
struct ost_lvb *lvb = aa->oa_lvb;
__u32 lvb_len = sizeof(*lvb);
__u64 flags = 0;
* is evicted from the cluster -- such scenarious make the life difficult, so
* release locks just after they are obtained. */
int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
- __u64 *flags, ldlm_policy_data_t *policy,
+ __u64 *flags, union ldlm_policy_data *policy,
struct ost_lvb *lvb, int kms_valid,
osc_enqueue_upcall_f upcall, void *cookie,
struct ldlm_enqueue_info *einfo,
struct ptlrpc_request *req = NULL;
int intent = *flags & LDLM_FL_HAS_INTENT;
__u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
- ldlm_mode_t mode;
+ enum ldlm_mode mode;
int rc;
ENTRY;
}
int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
- __u32 type, ldlm_policy_data_t *policy, __u32 mode,
- __u64 *flags, void *data, struct lustre_handle *lockh,
- int unref)
+ enum ldlm_type type, union ldlm_policy_data *policy,
+ enum ldlm_mode mode, __u64 *flags, void *data,
+ struct lustre_handle *lockh, int unref)
{
struct obd_device *obd = exp->exp_obd;
__u64 lflags = *flags;
- ldlm_mode_t rc;
+ enum ldlm_mode rc;
ENTRY;
- if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
- RETURN(-EIO);
+ if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
+ RETURN(-EIO);
- /* Filesystem lock extents are extended to page boundaries so that
- * dealing with the page cache is a little smoother */
+ /* Filesystem lock extents are extended to page boundaries so that
+ * dealing with the page cache is a little smoother */
policy->l_extent.start -= policy->l_extent.start & ~PAGE_MASK;
policy->l_extent.end |= ~PAGE_MASK;
return rc;
}
+static int osc_ldlm_resource_invalidate(struct cfs_hash *hs,
+ struct cfs_hash_bd *bd, struct hlist_node *hnode, void *arg)
+{
+ struct lu_env *env = arg;
+ struct ldlm_resource *res = cfs_hash_object(hs, hnode);
+ struct ldlm_lock *lock;
+ struct osc_object *osc = NULL;
+ ENTRY;
+
+ lock_res(res);
+ list_for_each_entry(lock, &res->lr_granted, l_res_link) {
+ if (lock->l_ast_data != NULL && osc == NULL) {
+ osc = lock->l_ast_data;
+ cl_object_get(osc2cl(osc));
+ }
+ lock->l_ast_data = NULL;
+ }
+ unlock_res(res);
+
+ if (osc != NULL) {
+ osc_object_invalidate(env, osc);
+ cl_object_put(env, osc2cl(osc));
+ }
+
+ RETURN(0);
+}
+
static int osc_import_event(struct obd_device *obd,
struct obd_import *imp,
enum obd_import_event event)
struct lu_env *env;
int refcheck;
+ ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
+
env = cl_env_get(&refcheck);
if (!IS_ERR(env)) {
- /* Reset grants */
- cli = &obd->u.cli;
- /* all pages go to failing rpcs due to the invalid
- * import */
- osc_io_unplug(env, cli, NULL);
-
- ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
- cl_env_put(env, &refcheck);
+ osc_io_unplug(env, &obd->u.cli, NULL);
+
+ cfs_hash_for_each_nolock(ns->ns_rs_hash,
+ osc_ldlm_resource_invalidate,
+ env, 0);
+ cl_env_put(env, &refcheck);
+
+ ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
} else
rc = PTR_ERR(env);
break;
struct obd_type *type;
void *handler;
int rc;
+ int adding;
+ int added;
+ int req_count;
ENTRY;
rc = ptlrpcd_addref();
ptlrpc_lprocfs_register_obd(obd);
}
- /* We need to allocate a few requests more, because
- * brw_interpret tries to create new requests before freeing
- * previous ones, Ideally we want to have 2x max_rpcs_in_flight
- * reserved, but I'm afraid that might be too much wasted RAM
- * in fact, so 2 is just my guess and still should work. */
- cli->cl_import->imp_rq_pool =
- ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
- OST_MAXREQSIZE,
- ptlrpc_add_rqs_to_pool);
+ /*
+ * We try to control the total number of requests with a upper limit
+ * osc_reqpool_maxreqcount. There might be some race which will cause
+ * over-limit allocation, but it is fine.
+ */
+ req_count = atomic_read(&osc_pool_req_count);
+ if (req_count < osc_reqpool_maxreqcount) {
+ adding = cli->cl_max_rpcs_in_flight + 2;
+ if (req_count + adding > osc_reqpool_maxreqcount)
+ adding = osc_reqpool_maxreqcount - req_count;
+
+ added = ptlrpc_add_rqs_to_pool(osc_rq_pool, adding);
+ atomic_add(added, &osc_pool_req_count);
+ }
INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
ns_register_cancel(obd->obd_namespace, osc_cancel_weight);
}
/* free memory of osc quota cache */
- osc_quota_cleanup(obd);
+ osc_quota_cleanup(obd);
- rc = client_obd_cleanup(obd);
+ rc = client_obd_cleanup(obd);
- ptlrpcd_decref();
- RETURN(rc);
+ ptlrpcd_decref();
+ RETURN(rc);
}
int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
{
bool enable_proc = true;
struct obd_type *type;
+ unsigned int reqpool_size;
+ unsigned int reqsize;
int rc;
+
ENTRY;
/* print an address of _any_ initialized kernel symbol from this
rc = class_register_type(&osc_obd_ops, NULL, enable_proc, NULL,
LUSTRE_OSC_NAME, &osc_device_type);
- if (rc) {
- lu_kmem_fini(osc_caches);
- RETURN(rc);
- }
+ if (rc)
+ GOTO(out_kmem, rc);
+
+ /* This is obviously too much memory, only prevent overflow here */
+ if (osc_reqpool_mem_max >= 1 << 12 || osc_reqpool_mem_max == 0)
+ GOTO(out_type, rc = -EINVAL);
+
+ reqpool_size = osc_reqpool_mem_max << 20;
+
+ reqsize = 1;
+ while (reqsize < OST_IO_MAXREQSIZE)
+ reqsize = reqsize << 1;
+
+ /*
+ * We don't enlarge the request count in OSC pool according to
+ * cl_max_rpcs_in_flight. The allocation from the pool will only be
+ * tried after normal allocation failed. So a small OSC pool won't
+ * cause much performance degression in most of cases.
+ */
+ osc_reqpool_maxreqcount = reqpool_size / reqsize;
+ atomic_set(&osc_pool_req_count, 0);
+ osc_rq_pool = ptlrpc_init_rq_pool(0, OST_IO_MAXREQSIZE,
+ ptlrpc_add_rqs_to_pool);
+
+ if (osc_rq_pool != NULL)
+ GOTO(out, rc);
+ rc = -ENOMEM;
+out_type:
+ class_unregister_type(LUSTRE_OSC_NAME);
+out_kmem:
+ lu_kmem_fini(osc_caches);
+out:
RETURN(rc);
}
{
class_unregister_type(LUSTRE_OSC_NAME);
lu_kmem_fini(osc_caches);
+ ptlrpc_free_rq_pool(osc_rq_pool);
}
MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");