-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
* GPL HEADER START
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
/*
* Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
- */
-/*
- * Copyright (c) 2011 Whamcloud, Inc.
+ *
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*/
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
#define DEBUG_SUBSYSTEM S_OSC
#include <libcfs/libcfs.h>
#include <lustre_debug.h>
#include <lustre_param.h>
#include "osc_internal.h"
-
-static quota_interface_t *quota_interface = NULL;
-extern quota_interface_t osc_quota_interface;
+#include "osc_cl_internal.h"
static void osc_release_ppga(struct brw_page **ppga, obd_count count);
static int brw_interpret(const struct lu_env *env,
RETURN(0);
}
-static int osc_getattr(struct obd_export *exp, struct obd_info *oinfo)
+static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
+ struct obd_info *oinfo)
{
struct ptlrpc_request *req;
struct ost_body *body;
return rc;
}
-static int osc_setattr(struct obd_export *exp, struct obd_info *oinfo,
- struct obd_trans_info *oti)
+static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
+ struct obd_info *oinfo, struct obd_trans_info *oti)
{
struct ptlrpc_request *req;
struct ost_body *body;
ptlrpc_request_set_replen(req);
-
req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
sa = ptlrpc_req_async_args(req);
RETURN(0);
}
-static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,
- struct obd_trans_info *oti,
+static int osc_punch(const struct lu_env *env, struct obd_export *exp,
+ struct obd_info *oinfo, struct obd_trans_info *oti,
struct ptlrpc_request_set *rqset)
{
oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
struct ptlrpc_request *req,
void *arg, int rc)
{
- struct osc_async_args *aa = arg;
+ struct osc_fsync_args *fa = arg;
struct ost_body *body;
ENTRY;
GOTO(out, rc = -EPROTO);
}
- *aa->aa_oi->oi_oa = body->oa;
+ *fa->fa_oi->oi_oa = body->oa;
out:
- rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
- RETURN(rc);
+ rc = fa->fa_upcall(fa->fa_cookie, rc);
+ RETURN(rc);
}
-static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
- obd_size start, obd_size end,
- struct ptlrpc_request_set *set)
+int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
+ obd_enqueue_update_f upcall, void *cookie,
+ struct ptlrpc_request_set *rqset)
{
- struct ptlrpc_request *req;
- struct ost_body *body;
- struct osc_async_args *aa;
+ struct ptlrpc_request *req;
+ struct ost_body *body;
+ struct osc_fsync_args *fa;
int rc;
ENTRY;
- if (!oinfo->oi_oa) {
- CDEBUG(D_INFO, "oa NULL\n");
- RETURN(-EINVAL);
- }
-
req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
if (req == NULL)
RETURN(-ENOMEM);
body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
LASSERT(body);
lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
- body->oa.o_size = start;
- body->oa.o_blocks = end;
- body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
osc_pack_capa(req, body, oinfo->oi_capa);
ptlrpc_request_set_replen(req);
req->rq_interpret_reply = osc_sync_interpret;
- CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
- aa = ptlrpc_req_async_args(req);
- aa->aa_oi = oinfo;
+ CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
+ fa = ptlrpc_req_async_args(req);
+ fa->fa_oi = oinfo;
+ fa->fa_upcall = upcall;
+ fa->fa_cookie = cookie;
- ptlrpc_set_add_req(set, req);
- RETURN (0);
+ if (rqset == PTLRPCD_SET)
+ ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+ else
+ ptlrpc_set_add_req(rqset, req);
+
+ RETURN (0);
+}
+
+static int osc_sync(const struct lu_env *env, struct obd_export *exp,
+ struct obd_info *oinfo, obd_size start, obd_size end,
+ struct ptlrpc_request_set *set)
+{
+ ENTRY;
+
+ if (!oinfo->oi_oa) {
+ CDEBUG(D_INFO, "oa NULL\n");
+ RETURN(-EINVAL);
+ }
+
+ oinfo->oi_oa->o_size = start;
+ oinfo->oi_oa->o_blocks = end;
+ oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
+
+ RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
}
/* Find and cancel locally locks matched by @mode in the resource found by
int count;
ENTRY;
+ /* Return, i.e. cancel nothing, only if ELC is supported (flag in
+ * export) but disabled through procfs (flag in NS).
+ *
+ * This distinguishes from a case when ELC is not supported originally,
+ * when we still want to cancel locks in advance and just cancel them
+ * locally, without sending any RPC. */
+ if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
+ RETURN(0);
+
osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
if (res == NULL)
return 0;
}
+int osc_create(const struct lu_env *env, struct obd_export *exp,
+ struct obdo *oa, struct lov_stripe_md **ea,
+ struct obd_trans_info *oti)
+{
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(oa);
+ LASSERT(ea);
+ LASSERT(oa->o_valid & OBD_MD_FLGROUP);
+
+ if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+ oa->o_flags == OBD_FL_RECREATE_OBJS) {
+ RETURN(osc_real_create(exp, oa, ea, oti));
+ }
+
+ if (!fid_seq_is_mdt(oa->o_seq))
+ RETURN(osc_real_create(exp, oa, ea, oti));
+
+ /* we should not get here anymore */
+ LBUG();
+
+ RETURN(rc);
+}
+
/* Destroy requests can be async always on the client, and we don't even really
* care about the return code since the client cannot do anything at all about
* a destroy failure.
* the records are not cancelled, and when the OST reconnects to the MDS next,
* it will retrieve the llog unlink logs and then sends the log cancellation
* cookies to the MDS after committing destroy transactions. */
-static int osc_destroy(struct obd_export *exp, struct obdo *oa,
- struct lov_stripe_md *ea, struct obd_trans_info *oti,
- struct obd_export *md_export, void *capa)
+static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
+ struct obdo *oa, struct lov_stripe_md *ea,
+ struct obd_trans_info *oti, struct obd_export *md_export,
+ void *capa)
{
struct client_obd *cli = &exp->exp_obd->u.cli;
struct ptlrpc_request *req;
osc_pack_capa(req, body, (struct obd_capa *)capa);
ptlrpc_request_set_replen(req);
- /* don't throttle destroy RPCs for the MDT */
- if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
+ /* If osc_destory is for destroying the unlink orphan,
+ * sent from MDT to OST, which should not be blocked here,
+ * because the process might be triggered by ptlrpcd, and
+ * it is not good to block ptlrpcd thread (b=16006)*/
+ if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
req->rq_interpret_reply = osc_destroy_interpret;
if (!osc_can_send_destroy(cli)) {
struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
(cli->cl_max_rpcs_in_flight + 1);
oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
}
- oa->o_grant = cli->cl_avail_grant;
+ oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
oa->o_dropped = cli->cl_lost_grant;
cli->cl_lost_grant = 0;
client_obd_list_unlock(&cli->cl_loi_list_lock);
}
-static void osc_update_next_shrink(struct client_obd *cli)
+void osc_update_next_shrink(struct client_obd *cli)
{
cli->cl_next_shrink_grant =
cfs_time_shift(cli->cl_grant_shrink_interval);
cli->cl_next_shrink_grant);
}
-/* caller must hold loi_list_lock */
-static void osc_consume_write_grant(struct client_obd *cli,
- struct brw_page *pga)
-{
- LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
- LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
- cfs_atomic_inc(&obd_dirty_pages);
- cli->cl_dirty += CFS_PAGE_SIZE;
- cli->cl_avail_grant -= CFS_PAGE_SIZE;
- pga->flag |= OBD_BRW_FROM_GRANT;
- CDEBUG(D_CACHE, "using %lu grant credits for brw %p page %p\n",
- CFS_PAGE_SIZE, pga, pga->pg);
- LASSERT(cli->cl_avail_grant >= 0);
- osc_update_next_shrink(cli);
-}
-
-/* the companion to osc_consume_write_grant, called when a brw has completed.
- * must be called with the loi lock held. */
-static void osc_release_write_grant(struct client_obd *cli,
- struct brw_page *pga, int sent)
-{
- int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
- ENTRY;
-
- LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
- if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
- EXIT;
- return;
- }
-
- pga->flag &= ~OBD_BRW_FROM_GRANT;
- cfs_atomic_dec(&obd_dirty_pages);
- cli->cl_dirty -= CFS_PAGE_SIZE;
- if (pga->flag & OBD_BRW_NOCACHE) {
- pga->flag &= ~OBD_BRW_NOCACHE;
- cfs_atomic_dec(&obd_dirty_transit_pages);
- cli->cl_dirty_transit -= CFS_PAGE_SIZE;
- }
- if (!sent) {
- cli->cl_lost_grant += CFS_PAGE_SIZE;
- CDEBUG(D_CACHE, "lost grant: %lu avail grant: %lu dirty: %lu\n",
- cli->cl_lost_grant, cli->cl_avail_grant, cli->cl_dirty);
- } else if (CFS_PAGE_SIZE != blocksize && pga->count != CFS_PAGE_SIZE) {
- /* For short writes we shouldn't count parts of pages that
- * span a whole block on the OST side, or our accounting goes
- * wrong. Should match the code in filter_grant_check. */
- int offset = pga->off & ~CFS_PAGE_MASK;
- int count = pga->count + (offset & (blocksize - 1));
- int end = (offset + pga->count) & (blocksize - 1);
- if (end)
- count += blocksize - end;
-
- cli->cl_lost_grant += CFS_PAGE_SIZE - count;
- CDEBUG(D_CACHE, "lost %lu grant: %lu avail: %lu dirty: %lu\n",
- CFS_PAGE_SIZE - count, cli->cl_lost_grant,
- cli->cl_avail_grant, cli->cl_dirty);
- }
-
- EXIT;
-}
-
-static unsigned long rpcs_in_flight(struct client_obd *cli)
-{
- return cli->cl_r_in_flight + cli->cl_w_in_flight;
-}
-
-/* caller must hold loi_list_lock */
-void osc_wake_cache_waiters(struct client_obd *cli)
-{
- cfs_list_t *l, *tmp;
- struct osc_cache_waiter *ocw;
-
- ENTRY;
- cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
- /* if we can't dirty more, we must wait until some is written */
- if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
- (cfs_atomic_read(&obd_dirty_pages) + 1 >
- obd_max_dirty_pages)) {
- CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
- "osc max %ld, sys max %d\n", cli->cl_dirty,
- cli->cl_dirty_max, obd_max_dirty_pages);
- return;
- }
-
- /* if still dirty cache but no grant wait for pending RPCs that
- * may yet return us some grant before doing sync writes */
- if (cli->cl_w_in_flight && cli->cl_avail_grant < CFS_PAGE_SIZE) {
- CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
- cli->cl_w_in_flight);
- return;
- }
-
- ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
- cfs_list_del_init(&ocw->ocw_entry);
- if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
- /* no more RPCs in flight to return grant, do sync IO */
- ocw->ocw_rc = -EDQUOT;
- CDEBUG(D_INODE, "wake oap %p for sync\n", ocw->ocw_oap);
- } else {
- osc_consume_write_grant(cli,
- &ocw->ocw_oap->oap_brw_page);
- }
-
- cfs_waitq_signal(&ocw->ocw_waitq);
- }
-
- EXIT;
-}
-
static void __osc_update_grant(struct client_obd *cli, obd_size grant)
{
client_obd_list_lock(&cli->cl_loi_list_lock);
}
}
-static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
- void *key, obd_count vallen, void *val,
- struct ptlrpc_request_set *set);
+static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
+ obd_count keylen, void *key, obd_count vallen,
+ void *val, struct ptlrpc_request_set *set);
static int osc_shrink_grant_interpret(const struct lu_env *env,
struct ptlrpc_request *req,
body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
osc_update_next_shrink(cli);
- rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
+ rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
sizeof(*body), body, NULL);
if (rc != 0)
cli->cl_avail_grant = ocd->ocd_grant;
}
- client_obd_list_unlock(&cli->cl_loi_list_lock);
+ /* determine the appropriate chunk size used by osc_extent. */
+ cli->cl_chunkbits = max_t(int, CFS_PAGE_SHIFT, ocd->ocd_blocksize);
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
- CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
- cli->cl_import->imp_obd->obd_name,
- cli->cl_avail_grant, cli->cl_lost_grant);
+ CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
+ "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
+ cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
- if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
- cfs_list_empty(&cli->cl_grant_shrink_list))
- osc_add_shrink_grant(cli);
+ if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
+ cfs_list_empty(&cli->cl_grant_shrink_list))
+ osc_add_shrink_grant(cli);
}
/* We assume that the reason this OSC got a short read is because it read
static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
{
if (p1->flag != p2->flag) {
- unsigned mask = ~(OBD_BRW_FROM_GRANT|
- OBD_BRW_NOCACHE|OBD_BRW_SYNC|OBD_BRW_ASYNC);
+ unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
+ OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
/* warn if we try to combine flags that we don't know to be
* safe to combine */
- if ((p1->flag & mask) != (p2->flag & mask))
- CERROR("is it ok to have flags 0x%x and 0x%x in the "
- "same brw?\n", p1->flag, p2->flag);
+ if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
+ CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
+ "report this at http://bugs.whamcloud.com/\n",
+ p1->flag, p2->flag);
+ }
return 0;
}
}
static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
- struct brw_page **pga, int opc,
- cksum_type_t cksum_type)
-{
- __u32 cksum;
- int i = 0;
-
- LASSERT (pg_count > 0);
- cksum = init_checksum(cksum_type);
- while (nob > 0 && pg_count > 0) {
- unsigned char *ptr = cfs_kmap(pga[i]->pg);
- int off = pga[i]->off & ~CFS_PAGE_MASK;
- int count = pga[i]->count > nob ? nob : pga[i]->count;
-
- /* corrupt the data before we compute the checksum, to
- * simulate an OST->client data error */
- if (i == 0 && opc == OST_READ &&
- OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
- memcpy(ptr + off, "bad1", min(4, nob));
- cksum = compute_checksum(cksum, ptr + off, count, cksum_type);
- cfs_kunmap(pga[i]->pg);
- LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
- off, cksum);
-
- nob -= pga[i]->count;
- pg_count--;
- i++;
- }
- /* For sending we only compute the wrong checksum instead
- * of corrupting the data so it is still correct on a redo */
- if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
- cksum++;
-
- return cksum;
+ struct brw_page **pga, int opc,
+ cksum_type_t cksum_type)
+{
+ __u32 cksum;
+ int i = 0;
+ struct cfs_crypto_hash_desc *hdesc;
+ unsigned int bufsize;
+ int err;
+ unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
+
+ LASSERT(pg_count > 0);
+
+ hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
+ if (IS_ERR(hdesc)) {
+ CERROR("Unable to initialize checksum hash %s\n",
+ cfs_crypto_hash_name(cfs_alg));
+ return PTR_ERR(hdesc);
+ }
+
+ while (nob > 0 && pg_count > 0) {
+ int count = pga[i]->count > nob ? nob : pga[i]->count;
+
+ /* corrupt the data before we compute the checksum, to
+ * simulate an OST->client data error */
+ if (i == 0 && opc == OST_READ &&
+ OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
+ unsigned char *ptr = cfs_kmap(pga[i]->pg);
+ int off = pga[i]->off & ~CFS_PAGE_MASK;
+ memcpy(ptr + off, "bad1", min(4, nob));
+ cfs_kunmap(pga[i]->pg);
+ }
+ cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
+ pga[i]->off & ~CFS_PAGE_MASK,
+ count);
+ LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
+ (int)(pga[i]->off & ~CFS_PAGE_MASK), cksum);
+
+ nob -= pga[i]->count;
+ pg_count--;
+ i++;
+ }
+
+ bufsize = 4;
+ err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
+
+ if (err)
+ cfs_crypto_hash_final(hdesc, NULL, NULL);
+
+ /* For sending we only compute the wrong checksum instead
+ * of corrupting the data so it is still correct on a redo */
+ if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
+ cksum++;
+
+ return cksum;
}
static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
}
req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
ptlrpc_at_set_req_timeout(req);
+ /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
+ * retry logic */
+ req->rq_no_retry_einprogress = 1;
if (opc == OST_WRITE)
desc = ptlrpc_prep_bulk_imp(req, page_count,
LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
(pg->flag & OBD_BRW_SRVLOCK));
- ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
+ ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
requested_nob += pg->count;
if (i > 0 && can_merge_pages(pg_prev, pg)) {
/* size[REQ_REC_OFF] still sizeof (*body) */
if (opc == OST_WRITE) {
- if (unlikely(cli->cl_checksum) &&
+ if (cli->cl_checksum &&
!sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
/* store cl_cksum_type in a local variable since
* it can be changed via lprocfs */
req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
sizeof(__u32) * niocount);
} else {
- if (unlikely(cli->cl_checksum) &&
+ if (cli->cl_checksum &&
!sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
body->oa.o_flags = 0;
return 0;
}
- /* If this is mmaped file - it can be changed at any time */
- if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & OBD_FL_MMAP)
- return 1;
-
cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
oa->o_flags : 0);
new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
RETURN(-EPROTO);
}
-#ifdef HAVE_QUOTA_SUPPORT
/* set/clear over quota flag for a uid/gid */
if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
body->oa.o_flags);
- lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
- body->oa.o_flags);
+ osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
}
-#endif
osc_update_grant(cli, body);
struct ptlrpc_request *req;
int rc;
cfs_waitq_t waitq;
- int resends = 0;
+ int generation, resends = 0;
struct l_wait_info lwi;
ENTRY;
cfs_waitq_init(&waitq);
+ generation = exp->exp_obd->u.cli.cl_import->imp_generation;
restart_bulk:
rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
if (rc != 0)
return (rc);
+ if (resends) {
+ req->rq_generation_set = 1;
+ req->rq_import_generation = generation;
+ req->rq_sent = cfs_time_current_sec() + resends;
+ }
+
rc = ptlrpc_queue_wait(req);
if (rc == -ETIMEDOUT && req->rq_resend) {
rc = osc_brw_fini_request(req, rc);
ptlrpc_req_finished(req);
+ /* When server return -EINPROGRESS, client should always retry
+ * regardless of the number of times the bulk was resent already.*/
if (osc_recoverable_error(rc)) {
resends++;
- if (!client_should_resend(resends, &exp->exp_obd->u.cli)) {
- CERROR("too many resend retries, returning error\n");
- RETURN(-EIO);
+ if (rc != -EINPROGRESS &&
+ !client_should_resend(resends, &exp->exp_obd->u.cli)) {
+ CERROR("%s: too many resend retries for object: "
+ ""LPU64":"LPU64", rc = %d.\n",
+ exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
+ goto out;
+ }
+ if (generation !=
+ exp->exp_obd->u.cli.cl_import->imp_generation) {
+ CDEBUG(D_HA, "%s: resend cross eviction for object: "
+ ""LPU64":"LPU64", rc = %d.\n",
+ exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
+ goto out;
}
- lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL, NULL);
+ lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
+ NULL);
l_wait_event(waitq, 0, &lwi);
goto restart_bulk;
}
-
+out:
+ if (rc == -EAGAIN || rc == -EINPROGRESS)
+ rc = -EIO;
RETURN (rc);
}
-int osc_brw_redo_request(struct ptlrpc_request *request,
- struct osc_brw_async_args *aa)
+static int osc_brw_redo_request(struct ptlrpc_request *request,
+ struct osc_brw_async_args *aa, int rc)
{
struct ptlrpc_request *new_req;
- struct ptlrpc_request_set *set = request->rq_set;
struct osc_brw_async_args *new_aa;
struct osc_async_page *oap;
- int rc = 0;
ENTRY;
- if (!client_should_resend(aa->aa_resends, aa->aa_cli)) {
- CERROR("too many resent retries, returning error\n");
- RETURN(-EIO);
- }
-
- DEBUG_REQ(D_ERROR, request, "redo for recoverable error");
+ DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
+ "redo for recoverable error %d", rc);
rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
if (rc)
RETURN(rc);
- client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
-
cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
if (oap->oap_request != NULL) {
LASSERTF(request == oap->oap_request,
"request %p != oap_request %p\n",
request, oap->oap_request);
if (oap->oap_interrupted) {
- client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
ptlrpc_req_finished(new_req);
RETURN(-EINTR);
}
aa->aa_resends++;
new_req->rq_interpret_reply = request->rq_interpret_reply;
new_req->rq_async_args = request->rq_async_args;
- new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
+ /* cap resend delay to the current request timeout, this is similar to
+ * what ptlrpc does (see after_reply()) */
+ if (aa->aa_resends > new_req->rq_timeout)
+ new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
+ else
+ new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
+ new_req->rq_generation_set = 1;
+ new_req->rq_import_generation = request->rq_import_generation;
new_aa = ptlrpc_req_async_args(new_req);
CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
- cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
- CFS_INIT_LIST_HEAD(&aa->aa_oaps);
+ cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
+ CFS_INIT_LIST_HEAD(&new_aa->aa_exts);
+ cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
if (oap->oap_request) {
new_aa->aa_ocapa = aa->aa_ocapa;
aa->aa_ocapa = NULL;
- /* use ptlrpc_set_add_req is safe because interpret functions work
- * in check_set context. only one way exist with access to request
- * from different thread got -EINTR - this way protected with
- * cl_loi_list_lock */
- ptlrpc_set_add_req(set, new_req);
+ /* XXX: This code will run into problem if we're going to support
+ * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
+ * and wait for all of them to be finished. We should inherit request
+ * set from old request. */
+ ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
- client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
-
- DEBUG_REQ(D_INFO, new_req, "new request");
- RETURN(0);
+ DEBUG_REQ(D_INFO, new_req, "new request");
+ RETURN(0);
}
/*
RETURN(rc);
}
-/* The companion to osc_enter_cache(), called when @oap is no longer part of
- * the dirty accounting. Writeback completes or truncate happens before
- * writing starts. Must be called with the loi lock held. */
-static void osc_exit_cache(struct client_obd *cli, struct osc_async_page *oap,
- int sent)
-{
- osc_release_write_grant(cli, &oap->oap_brw_page, sent);
-}
-
-
-/* This maintains the lists of pending pages to read/write for a given object
- * (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
- * to quickly find objects that are ready to send an RPC. */
-static int lop_makes_rpc(struct client_obd *cli, struct loi_oap_pages *lop,
- int cmd)
-{
- int optimal;
- ENTRY;
-
- if (lop->lop_num_pending == 0)
- RETURN(0);
-
- /* if we have an invalid import we want to drain the queued pages
- * by forcing them through rpcs that immediately fail and complete
- * the pages. recovery relies on this to empty the queued pages
- * before canceling the locks and evicting down the llite pages */
- if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
- RETURN(1);
-
- /* stream rpcs in queue order as long as as there is an urgent page
- * queued. this is our cheap solution for good batching in the case
- * where writepage marks some random page in the middle of the file
- * as urgent because of, say, memory pressure */
- if (!cfs_list_empty(&lop->lop_urgent)) {
- CDEBUG(D_CACHE, "urgent request forcing RPC\n");
- RETURN(1);
- }
- /* fire off rpcs when we have 'optimal' rpcs as tuned for the wire. */
- optimal = cli->cl_max_pages_per_rpc;
- if (cmd & OBD_BRW_WRITE) {
- /* trigger a write rpc stream as long as there are dirtiers
- * waiting for space. as they're waiting, they're not going to
- * create more pages to coalesce with what's waiting.. */
- if (!cfs_list_empty(&cli->cl_cache_waiters)) {
- CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
- RETURN(1);
- }
- /* +16 to avoid triggering rpcs that would want to include pages
- * that are being queued but which can't be made ready until
- * the queuer finishes with the page. this is a wart for
- * llite::commit_write() */
- optimal += 16;
- }
- if (lop->lop_num_pending >= optimal)
- RETURN(1);
-
- RETURN(0);
-}
-
-static int lop_makes_hprpc(struct loi_oap_pages *lop)
-{
- struct osc_async_page *oap;
- ENTRY;
-
- if (cfs_list_empty(&lop->lop_urgent))
- RETURN(0);
-
- oap = cfs_list_entry(lop->lop_urgent.next,
- struct osc_async_page, oap_urgent_item);
-
- if (oap->oap_async_flags & ASYNC_HP) {
- CDEBUG(D_CACHE, "hp request forcing RPC\n");
- RETURN(1);
- }
-
- RETURN(0);
-}
-
-static void on_list(cfs_list_t *item, cfs_list_t *list,
- int should_be_on)
-{
- if (cfs_list_empty(item) && should_be_on)
- cfs_list_add_tail(item, list);
- else if (!cfs_list_empty(item) && !should_be_on)
- cfs_list_del_init(item);
-}
-
-/* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
- * can find pages to build into rpcs quickly */
-void loi_list_maint(struct client_obd *cli, struct lov_oinfo *loi)
-{
- if (lop_makes_hprpc(&loi->loi_write_lop) ||
- lop_makes_hprpc(&loi->loi_read_lop)) {
- /* HP rpc */
- on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
- on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
- } else {
- on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
- on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
- lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
- lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
- }
-
- on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
- loi->loi_write_lop.lop_num_pending);
-
- on_list(&loi->loi_read_item, &cli->cl_loi_read_list,
- loi->loi_read_lop.lop_num_pending);
-}
-
-static void lop_update_pending(struct client_obd *cli,
- struct loi_oap_pages *lop, int cmd, int delta)
-{
- lop->lop_num_pending += delta;
- if (cmd & OBD_BRW_WRITE)
- cli->cl_pending_w_pages += delta;
- else
- cli->cl_pending_r_pages += delta;
-}
-
-/**
- * this is called when a sync waiter receives an interruption. Its job is to
- * get the caller woken as soon as possible. If its page hasn't been put in an
- * rpc yet it can dequeue immediately. Otherwise it has to mark the rpc as
- * desiring interruption which will forcefully complete the rpc once the rpc
- * has timed out.
- */
-int osc_oap_interrupted(const struct lu_env *env, struct osc_async_page *oap)
-{
- struct loi_oap_pages *lop;
- struct lov_oinfo *loi;
- int rc = -EBUSY;
- ENTRY;
-
- LASSERT(!oap->oap_interrupted);
- oap->oap_interrupted = 1;
-
- /* ok, it's been put in an rpc. only one oap gets a request reference */
- if (oap->oap_request != NULL) {
- ptlrpc_mark_interrupted(oap->oap_request);
- ptlrpcd_wake(oap->oap_request);
- ptlrpc_req_finished(oap->oap_request);
- oap->oap_request = NULL;
- }
-
- /*
- * page completion may be called only if ->cpo_prep() method was
- * executed by osc_io_submit(), that also adds page the to pending list
- */
- if (!cfs_list_empty(&oap->oap_pending_item)) {
- cfs_list_del_init(&oap->oap_pending_item);
- cfs_list_del_init(&oap->oap_urgent_item);
-
- loi = oap->oap_loi;
- lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
- &loi->loi_write_lop : &loi->loi_read_lop;
- lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, -1);
- loi_list_maint(oap->oap_cli, oap->oap_loi);
- rc = oap->oap_caller_ops->ap_completion(env,
- oap->oap_caller_data,
- oap->oap_cmd, NULL, -EINTR);
- }
-
- RETURN(rc);
-}
-
-/* this is trying to propogate async writeback errors back up to the
- * application. As an async write fails we record the error code for later if
- * the app does an fsync. As long as errors persist we force future rpcs to be
- * sync so that the app can get a sync error and break the cycle of queueing
- * pages for which writeback will fail. */
-static void osc_process_ar(struct osc_async_rc *ar, __u64 xid,
- int rc)
-{
- if (rc) {
- if (!ar->ar_rc)
- ar->ar_rc = rc;
-
- ar->ar_force_sync = 1;
- ar->ar_min_xid = ptlrpc_sample_next_xid();
- return;
-
- }
-
- if (ar->ar_force_sync && (xid >= ar->ar_min_xid))
- ar->ar_force_sync = 0;
-}
-
-void osc_oap_to_pending(struct osc_async_page *oap)
-{
- struct loi_oap_pages *lop;
-
- if (oap->oap_cmd & OBD_BRW_WRITE)
- lop = &oap->oap_loi->loi_write_lop;
- else
- lop = &oap->oap_loi->loi_read_lop;
-
- if (oap->oap_async_flags & ASYNC_HP)
- cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
- else if (oap->oap_async_flags & ASYNC_URGENT)
- cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
- cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
- lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
-}
-
-/* this must be called holding the loi list lock to give coverage to exit_cache,
- * async_flag maintenance, and oap_request */
-static void osc_ap_completion(const struct lu_env *env,
- struct client_obd *cli, struct obdo *oa,
- struct osc_async_page *oap, int sent, int rc)
-{
- __u64 xid = 0;
-
- ENTRY;
- if (oap->oap_request != NULL) {
- xid = ptlrpc_req_xid(oap->oap_request);
- ptlrpc_req_finished(oap->oap_request);
- oap->oap_request = NULL;
- }
-
- cfs_spin_lock(&oap->oap_lock);
- oap->oap_async_flags = 0;
- cfs_spin_unlock(&oap->oap_lock);
- oap->oap_interrupted = 0;
-
- if (oap->oap_cmd & OBD_BRW_WRITE) {
- osc_process_ar(&cli->cl_ar, xid, rc);
- osc_process_ar(&oap->oap_loi->loi_ar, xid, rc);
- }
-
- if (rc == 0 && oa != NULL) {
- if (oa->o_valid & OBD_MD_FLBLOCKS)
- oap->oap_loi->loi_lvb.lvb_blocks = oa->o_blocks;
- if (oa->o_valid & OBD_MD_FLMTIME)
- oap->oap_loi->loi_lvb.lvb_mtime = oa->o_mtime;
- if (oa->o_valid & OBD_MD_FLATIME)
- oap->oap_loi->loi_lvb.lvb_atime = oa->o_atime;
- if (oa->o_valid & OBD_MD_FLCTIME)
- oap->oap_loi->loi_lvb.lvb_ctime = oa->o_ctime;
- }
-
- rc = oap->oap_caller_ops->ap_completion(env, oap->oap_caller_data,
- oap->oap_cmd, oa, rc);
-
- /* ll_ap_completion (from llite) drops PG_locked. so, a new
- * I/O on the page could start, but OSC calls it under lock
- * and thus we can add oap back to pending safely */
- if (rc)
- /* upper layer wants to leave the page on pending queue */
- osc_oap_to_pending(oap);
- else
- osc_exit_cache(cli, oap, sent);
- EXIT;
-}
-
static int brw_interpret(const struct lu_env *env,
struct ptlrpc_request *req, void *data, int rc)
{
- struct osc_brw_async_args *aa = data;
- struct client_obd *cli;
- int async;
+ struct osc_brw_async_args *aa = data;
+ struct osc_extent *ext;
+ struct osc_extent *tmp;
+ struct cl_object *obj = NULL;
+ struct client_obd *cli = aa->aa_cli;
ENTRY;
rc = osc_brw_fini_request(req, rc);
CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
+ /* When server return -EINPROGRESS, client should always retry
+ * regardless of the number of times the bulk was resent already. */
if (osc_recoverable_error(rc)) {
- /* Only retry once for mmaped files since the mmaped page
- * might be modified at anytime. We have to retry at least
- * once in case there WAS really a corruption of the page
- * on the network, that was not caused by mmap() modifying
- * the page. Bug11742 */
- if ((rc == -EAGAIN) && (aa->aa_resends > 0) &&
- aa->aa_oa->o_valid & OBD_MD_FLFLAGS &&
- aa->aa_oa->o_flags & OBD_FL_MMAP) {
- rc = 0;
+ if (req->rq_import_generation !=
+ req->rq_import->imp_generation) {
+ CDEBUG(D_HA, "%s: resend cross eviction for object: "
+ ""LPU64":"LPU64", rc = %d.\n",
+ req->rq_import->imp_obd->obd_name,
+ aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
+ } else if (rc == -EINPROGRESS ||
+ client_should_resend(aa->aa_resends, aa->aa_cli)) {
+ rc = osc_brw_redo_request(req, aa, rc);
} else {
- rc = osc_brw_redo_request(req, aa);
- if (rc == 0)
- RETURN(0);
+ CERROR("%s: too many resent retries for object: "
+ ""LPU64":"LPU64", rc = %d.\n",
+ req->rq_import->imp_obd->obd_name,
+ aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
}
+
+ if (rc == 0)
+ RETURN(0);
+ else if (rc == -EAGAIN || rc == -EINPROGRESS)
+ rc = -EIO;
}
if (aa->aa_ocapa) {
aa->aa_ocapa = NULL;
}
- cli = aa->aa_cli;
-
- client_obd_list_lock(&cli->cl_loi_list_lock);
-
- /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
- * is called so we know whether to go to sync BRWs or wait for more
- * RPCs to complete */
- if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
- cli->cl_w_in_flight--;
- else
- cli->cl_r_in_flight--;
-
- async = cfs_list_empty(&aa->aa_oaps);
- if (!async) { /* from osc_send_oap_rpc() */
- struct osc_async_page *oap, *tmp;
- /* the caller may re-use the oap after the completion call so
- * we need to clean it up a little */
- cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
- oap_rpc_item) {
- cfs_list_del_init(&oap->oap_rpc_item);
- osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
- }
- OBDO_FREE(aa->aa_oa);
- } else { /* from async_internal() */
- obd_count i;
- for (i = 0; i < aa->aa_page_count; i++)
- osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
- }
- osc_wake_cache_waiters(cli);
- osc_check_rpcs(env, cli);
- client_obd_list_unlock(&cli->cl_loi_list_lock);
- if (!async)
- cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
- req->rq_bulk->bd_nob_transferred);
- osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
- ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
-
- RETURN(rc);
+ cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
+ if (obj == NULL && rc == 0) {
+ obj = osc2cl(ext->oe_obj);
+ cl_object_get(obj);
+ }
+
+ cfs_list_del_init(&ext->oe_link);
+ osc_extent_finish(env, ext, 1, rc);
+ }
+ LASSERT(cfs_list_empty(&aa->aa_exts));
+ LASSERT(cfs_list_empty(&aa->aa_oaps));
+
+ if (obj != NULL) {
+ struct obdo *oa = aa->aa_oa;
+ struct cl_attr *attr = &osc_env_info(env)->oti_attr;
+ unsigned long valid = 0;
+
+ LASSERT(rc == 0);
+ if (oa->o_valid & OBD_MD_FLBLOCKS) {
+ attr->cat_blocks = oa->o_blocks;
+ valid |= CAT_BLOCKS;
+ }
+ if (oa->o_valid & OBD_MD_FLMTIME) {
+ attr->cat_mtime = oa->o_mtime;
+ valid |= CAT_MTIME;
+ }
+ if (oa->o_valid & OBD_MD_FLATIME) {
+ attr->cat_atime = oa->o_atime;
+ valid |= CAT_ATIME;
+ }
+ if (oa->o_valid & OBD_MD_FLCTIME) {
+ attr->cat_ctime = oa->o_ctime;
+ valid |= CAT_CTIME;
+ }
+ if (valid != 0) {
+ cl_object_attr_lock(obj);
+ cl_object_attr_set(env, obj, attr, valid);
+ cl_object_attr_unlock(obj);
+ }
+ cl_object_put(env, obj);
+ }
+ OBDO_FREE(aa->aa_oa);
+
+ cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
+ req->rq_bulk->bd_nob_transferred);
+ osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
+ ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
+
+ client_obd_list_lock(&cli->cl_loi_list_lock);
+ /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
+ * is called so we know whether to go to sync BRWs or wait for more
+ * RPCs to complete */
+ if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
+ cli->cl_w_in_flight--;
+ else
+ cli->cl_r_in_flight--;
+ osc_wake_cache_waiters(cli);
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+ osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
+ RETURN(rc);
}
-static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
- struct client_obd *cli,
- cfs_list_t *rpc_list,
- int page_count, int cmd)
-{
- struct ptlrpc_request *req;
- struct brw_page **pga = NULL;
- struct osc_brw_async_args *aa;
+/**
+ * Build an RPC by the list of extent @ext_list. The caller must ensure
+ * that the total pages in this list are NOT over max pages per RPC.
+ * Extents in the list must be in OES_RPC state.
+ */
+int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
+ cfs_list_t *ext_list, int cmd, pdl_policy_t pol)
+{
+ struct ptlrpc_request *req = NULL;
+ struct osc_extent *ext;
+ CFS_LIST_HEAD(rpc_list);
+ struct brw_page **pga = NULL;
+ struct osc_brw_async_args *aa = NULL;
struct obdo *oa = NULL;
- const struct obd_async_page_ops *ops = NULL;
- void *caller_data = NULL;
struct osc_async_page *oap;
struct osc_async_page *tmp;
- struct ost_body *body;
struct cl_req *clerq = NULL;
enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
struct ldlm_lock *lock = NULL;
struct cl_req_attr crattr;
- int i, rc, mpflag = 0;
-
- ENTRY;
- LASSERT(!cfs_list_empty(rpc_list));
-
- if (cmd & OBD_BRW_MEMALLOC)
- mpflag = cfs_memory_pressure_get_and_set();
-
- memset(&crattr, 0, sizeof crattr);
- OBD_ALLOC(pga, sizeof(*pga) * page_count);
- if (pga == NULL)
- GOTO(out, req = ERR_PTR(-ENOMEM));
-
- OBDO_ALLOC(oa);
- if (oa == NULL)
- GOTO(out, req = ERR_PTR(-ENOMEM));
-
- i = 0;
- cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
- struct cl_page *page = osc_oap2cl_page(oap);
- if (ops == NULL) {
- ops = oap->oap_caller_ops;
- caller_data = oap->oap_caller_data;
-
- clerq = cl_req_alloc(env, page, crt,
- 1 /* only 1-object rpcs for
- * now */);
- if (IS_ERR(clerq))
- GOTO(out, req = (void *)clerq);
- lock = oap->oap_ldlm_lock;
- }
+ obd_off starting_offset = OBD_OBJECT_EOF;
+ obd_off ending_offset = 0;
+ int i, rc, mpflag = 0, mem_tight = 0, page_count = 0;
+
+ ENTRY;
+ LASSERT(!cfs_list_empty(ext_list));
+
+ /* add pages into rpc_list to build BRW rpc */
+ cfs_list_for_each_entry(ext, ext_list, oe_link) {
+ LASSERT(ext->oe_state == OES_RPC);
+ mem_tight |= ext->oe_memalloc;
+ cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
+ ++page_count;
+ cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
+ if (starting_offset > oap->oap_obj_off)
+ starting_offset = oap->oap_obj_off;
+ else
+ LASSERT(oap->oap_page_off == 0);
+ if (ending_offset < oap->oap_obj_off + oap->oap_count)
+ ending_offset = oap->oap_obj_off +
+ oap->oap_count;
+ else
+ LASSERT(oap->oap_page_off + oap->oap_count ==
+ CFS_PAGE_SIZE);
+ }
+ }
+
+ if (mem_tight)
+ mpflag = cfs_memory_pressure_get_and_set();
+
+ memset(&crattr, 0, sizeof crattr);
+ OBD_ALLOC(pga, sizeof(*pga) * page_count);
+ if (pga == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ OBDO_ALLOC(oa);
+ if (oa == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ i = 0;
+ cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
+ struct cl_page *page = oap2cl_page(oap);
+ if (clerq == NULL) {
+ clerq = cl_req_alloc(env, page, crt,
+ 1 /* only 1-object rpcs for
+ * now */);
+ if (IS_ERR(clerq))
+ GOTO(out, rc = PTR_ERR(clerq));
+ lock = oap->oap_ldlm_lock;
+ }
+ if (mem_tight)
+ oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
pga[i] = &oap->oap_brw_page;
pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
}
/* always get the data for the obdo for the rpc */
- LASSERT(ops != NULL);
- crattr.cra_oa = oa;
- crattr.cra_capa = NULL;
+ LASSERT(clerq != NULL);
+ crattr.cra_oa = oa;
+ crattr.cra_capa = NULL;
+ memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
cl_req_attr_set(env, clerq, &crattr, ~0ULL);
if (lock) {
oa->o_handle = lock->l_remote_handle;
rc = cl_req_prep(env, clerq);
if (rc != 0) {
CERROR("cl_req_prep failed: %d\n", rc);
- GOTO(out, req = ERR_PTR(rc));
- }
-
- sort_brw_pages(pga, page_count);
- rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
- pga, &req, crattr.cra_capa, 1, 0);
- if (rc != 0) {
- CERROR("prep_req failed: %d\n", rc);
- GOTO(out, req = ERR_PTR(rc));
- }
-
- if (cmd & OBD_BRW_MEMALLOC)
+ GOTO(out, rc);
+ }
+
+ sort_brw_pages(pga, page_count);
+ rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
+ pga, &req, crattr.cra_capa, 1, 0);
+ if (rc != 0) {
+ CERROR("prep_req failed: %d\n", rc);
+ GOTO(out, rc);
+ }
+
+ req->rq_interpret_reply = brw_interpret;
+ if (mem_tight != 0)
req->rq_memalloc = 1;
/* Need to update the timestamps after the request is built in case
* later setattr before earlier BRW (as determined by the request xid),
* the OST will not use BRW timestamps. Sadly, there is no obvious
* way to do this in a single call. bug 10150 */
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
cl_req_attr_set(env, clerq, &crattr,
OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
- CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
- aa = ptlrpc_req_async_args(req);
- CFS_INIT_LIST_HEAD(&aa->aa_oaps);
- cfs_list_splice(rpc_list, &aa->aa_oaps);
- CFS_INIT_LIST_HEAD(rpc_list);
- aa->aa_clerq = clerq;
+ lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid);
+
+ CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+ aa = ptlrpc_req_async_args(req);
+ CFS_INIT_LIST_HEAD(&aa->aa_oaps);
+ cfs_list_splice_init(&rpc_list, &aa->aa_oaps);
+ CFS_INIT_LIST_HEAD(&aa->aa_exts);
+ cfs_list_splice_init(ext_list, &aa->aa_exts);
+ aa->aa_clerq = clerq;
+
+ /* queued sync pages can be torn down while the pages
+ * were between the pending list and the rpc */
+ tmp = NULL;
+ cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
+ /* only one oap gets a request reference */
+ if (tmp == NULL)
+ tmp = oap;
+ if (oap->oap_interrupted && !req->rq_intr) {
+ CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
+ oap, req);
+ ptlrpc_mark_interrupted(req);
+ }
+ }
+ if (tmp != NULL)
+ tmp->oap_request = ptlrpc_request_addref(req);
+
+ client_obd_list_lock(&cli->cl_loi_list_lock);
+ starting_offset >>= CFS_PAGE_SHIFT;
+ if (cmd == OBD_BRW_READ) {
+ cli->cl_r_in_flight++;
+ lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
+ lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
+ lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
+ starting_offset + 1);
+ } else {
+ cli->cl_w_in_flight++;
+ lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
+ lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
+ lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
+ starting_offset + 1);
+ }
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+ DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
+ page_count, aa, cli->cl_r_in_flight,
+ cli->cl_w_in_flight);
+
+ /* XXX: Maybe the caller can check the RPC bulk descriptor to
+ * see which CPU/NUMA node the majority of pages were allocated
+ * on, and try to assign the async RPC to the CPU core
+ * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
+ *
+ * But on the other hand, we expect that multiple ptlrpcd
+ * threads and the initial write sponsor can run in parallel,
+ * especially when data checksum is enabled, which is CPU-bound
+ * operation and single ptlrpcd thread cannot process in time.
+ * So more ptlrpcd threads sharing BRW load
+ * (with PDL_POLICY_ROUND) seems better.
+ */
+ ptlrpcd_add_req(req, pol, -1);
+ rc = 0;
+ EXIT;
+
out:
- if (cmd & OBD_BRW_MEMALLOC)
- cfs_memory_pressure_restore(mpflag);
+ if (mem_tight != 0)
+ cfs_memory_pressure_restore(mpflag);
+
+ capa_put(crattr.cra_capa);
+ if (rc != 0) {
+ LASSERT(req == NULL);
- capa_put(crattr.cra_capa);
- if (IS_ERR(req)) {
if (oa)
OBDO_FREE(oa);
if (pga)
OBD_FREE(pga, sizeof(*pga) * page_count);
/* this should happen rarely and is pretty bad, it makes the
* pending list not follow the dirty order */
- client_obd_list_lock(&cli->cl_loi_list_lock);
- cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
- cfs_list_del_init(&oap->oap_rpc_item);
-
- /* queued sync pages can be torn down while the pages
- * were between the pending list and the rpc */
- if (oap->oap_interrupted) {
- CDEBUG(D_INODE, "oap %p interrupted\n", oap);
- osc_ap_completion(env, cli, NULL, oap, 0,
- oap->oap_count);
- continue;
- }
- osc_ap_completion(env, cli, NULL, oap, 0, PTR_ERR(req));
- }
- if (clerq && !IS_ERR(clerq))
- cl_req_completion(env, clerq, PTR_ERR(req));
- }
- RETURN(req);
+ while (!cfs_list_empty(ext_list)) {
+ ext = cfs_list_entry(ext_list->next, struct osc_extent,
+ oe_link);
+ cfs_list_del_init(&ext->oe_link);
+ osc_extent_finish(env, ext, 0, rc);
+ }
+ if (clerq && !IS_ERR(clerq))
+ cl_req_completion(env, clerq, rc);
+ }
+ RETURN(rc);
}
-/**
- * prepare pages for ASYNC io and put pages in send queue.
- *
- * \param cmd OBD_BRW_* macroses
- * \param lop pending pages
- *
- * \return zero if no page added to send queue.
- * \return 1 if pages successfully added to send queue.
- * \return negative on errors.
- */
-static int
-osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
- struct lov_oinfo *loi,
- int cmd, struct loi_oap_pages *lop)
+static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
+ struct ldlm_enqueue_info *einfo)
{
- struct ptlrpc_request *req;
- obd_count page_count = 0;
- struct osc_async_page *oap = NULL, *tmp;
- struct osc_brw_async_args *aa;
- const struct obd_async_page_ops *ops;
- CFS_LIST_HEAD(rpc_list);
- int srvlock = 0, mem_tight = 0;
- struct cl_object *clob = NULL;
- obd_off starting_offset = OBD_OBJECT_EOF;
- unsigned int ending_offset;
- int starting_page_off = 0;
- ENTRY;
-
- /* ASYNC_HP pages first. At present, when the lock the pages is
- * to be canceled, the pages covered by the lock will be sent out
- * with ASYNC_HP. We have to send out them as soon as possible. */
- cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
- if (oap->oap_async_flags & ASYNC_HP)
- cfs_list_move(&oap->oap_pending_item, &lop->lop_pending);
- if (++page_count >= cli->cl_max_pages_per_rpc)
- break;
- }
- page_count = 0;
-
- /* first we find the pages we're allowed to work with */
- cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
- oap_pending_item) {
- ops = oap->oap_caller_ops;
+ void *data = einfo->ei_cbdata;
+ int set = 0;
- LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
- "magic 0x%x\n", oap, oap->oap_magic);
+ LASSERT(lock != NULL);
+ LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
+ LASSERT(lock->l_resource->lr_type == einfo->ei_type);
+ LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
+ LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
- if (clob == NULL) {
- /* pin object in memory, so that completion call-backs
- * can be safely called under client_obd_list lock. */
- clob = osc_oap2cl_page(oap)->cp_obj;
- cl_object_get(clob);
- }
+ lock_res_and_lock(lock);
+ spin_lock(&osc_ast_guard);
- if (page_count != 0 &&
- srvlock != !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK)) {
- CDEBUG(D_PAGE, "SRVLOCK flag mismatch,"
- " oap %p, page %p, srvlock %u\n",
- oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
- break;
- }
+ if (lock->l_ast_data == NULL)
+ lock->l_ast_data = data;
+ if (lock->l_ast_data == data)
+ set = 1;
- /* If there is a gap at the start of this page, it can't merge
- * with any previous page, so we'll hand the network a
- * "fragmented" page array that it can't transfer in 1 RDMA */
- if (oap->oap_obj_off < starting_offset) {
- if (starting_page_off != 0)
- break;
+ spin_unlock(&osc_ast_guard);
+ unlock_res_and_lock(lock);
- starting_page_off = oap->oap_page_off;
- starting_offset = oap->oap_obj_off + starting_page_off;
- } else if (oap->oap_page_off != 0)
- break;
-
- /* in llite being 'ready' equates to the page being locked
- * until completion unlocks it. commit_write submits a page
- * as not ready because its unlock will happen unconditionally
- * as the call returns. if we race with commit_write giving
- * us that page we don't want to create a hole in the page
- * stream, so we stop and leave the rpc to be fired by
- * another dirtier or kupdated interval (the not ready page
- * will still be on the dirty list). we could call in
- * at the end of ll_file_write to process the queue again. */
- if (!(oap->oap_async_flags & ASYNC_READY)) {
- int rc = ops->ap_make_ready(env, oap->oap_caller_data,
- cmd);
- if (rc < 0)
- CDEBUG(D_INODE, "oap %p page %p returned %d "
- "instead of ready\n", oap,
- oap->oap_page, rc);
- switch (rc) {
- case -EAGAIN:
- /* llite is telling us that the page is still
- * in commit_write and that we should try
- * and put it in an rpc again later. we
- * break out of the loop so we don't create
- * a hole in the sequence of pages in the rpc
- * stream.*/
- oap = NULL;
- break;
- case -EINTR:
- /* the io isn't needed.. tell the checks
- * below to complete the rpc with EINTR */
- cfs_spin_lock(&oap->oap_lock);
- oap->oap_async_flags |= ASYNC_COUNT_STABLE;
- cfs_spin_unlock(&oap->oap_lock);
- oap->oap_count = -EINTR;
- break;
- case 0:
- cfs_spin_lock(&oap->oap_lock);
- oap->oap_async_flags |= ASYNC_READY;
- cfs_spin_unlock(&oap->oap_lock);
- break;
- default:
- LASSERTF(0, "oap %p page %p returned %d "
- "from make_ready\n", oap,
- oap->oap_page, rc);
- break;
- }
- }
- if (oap == NULL)
- break;
- /*
- * Page submitted for IO has to be locked. Either by
- * ->ap_make_ready() or by higher layers.
- */
-#if defined(__KERNEL__) && defined(__linux__)
- {
- struct cl_page *page;
-
- page = osc_oap2cl_page(oap);
-
- if (page->cp_type == CPT_CACHEABLE &&
- !(PageLocked(oap->oap_page) &&
- (CheckWriteback(oap->oap_page, cmd)))) {
- CDEBUG(D_PAGE, "page %p lost wb %lx/%x\n",
- oap->oap_page,
- (long)oap->oap_page->flags,
- oap->oap_async_flags);
- LBUG();
- }
- }
-#endif
-
- /* take the page out of our book-keeping */
- cfs_list_del_init(&oap->oap_pending_item);
- lop_update_pending(cli, lop, cmd, -1);
- cfs_list_del_init(&oap->oap_urgent_item);
-
- /* ask the caller for the size of the io as the rpc leaves. */
- if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
- oap->oap_count =
- ops->ap_refresh_count(env, oap->oap_caller_data,
- cmd);
- LASSERT(oap->oap_page_off + oap->oap_count <= CFS_PAGE_SIZE);
- }
- if (oap->oap_count <= 0) {
- CDEBUG(D_CACHE, "oap %p count %d, completing\n", oap,
- oap->oap_count);
- osc_ap_completion(env, cli, NULL,
- oap, 0, oap->oap_count);
- continue;
- }
-
- /* now put the page back in our accounting */
- cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
- if (page_count++ == 0)
- srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
-
- if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
- mem_tight = 1;
-
- /* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
- * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
- * have the same alignment as the initial writes that allocated
- * extents on the server. */
- ending_offset = oap->oap_obj_off + oap->oap_page_off +
- oap->oap_count;
- if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1)))
- break;
-
- if (page_count >= cli->cl_max_pages_per_rpc)
- break;
-
- /* If there is a gap at the end of this page, it can't merge
- * with any subsequent pages, so we'll hand the network a
- * "fragmented" page array that it can't transfer in 1 RDMA */
- if (oap->oap_page_off + oap->oap_count < CFS_PAGE_SIZE)
- break;
- }
-
- osc_wake_cache_waiters(cli);
-
- loi_list_maint(cli, loi);
-
- client_obd_list_unlock(&cli->cl_loi_list_lock);
-
- if (clob != NULL)
- cl_object_put(env, clob);
-
- if (page_count == 0) {
- client_obd_list_lock(&cli->cl_loi_list_lock);
- RETURN(0);
- }
-
- req = osc_build_req(env, cli, &rpc_list, page_count,
- mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
- if (IS_ERR(req)) {
- LASSERT(cfs_list_empty(&rpc_list));
- loi_list_maint(cli, loi);
- RETURN(PTR_ERR(req));
- }
-
- aa = ptlrpc_req_async_args(req);
-
- starting_offset &= PTLRPC_MAX_BRW_SIZE - 1;
- if (cmd == OBD_BRW_READ) {
- lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
- lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
- lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
- (starting_offset >> CFS_PAGE_SHIFT) + 1);
- } else {
- lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
- lprocfs_oh_tally(&cli->cl_write_rpc_hist,
- cli->cl_w_in_flight);
- lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
- (starting_offset >> CFS_PAGE_SHIFT) + 1);
- }
-
- client_obd_list_lock(&cli->cl_loi_list_lock);
-
- if (cmd == OBD_BRW_READ)
- cli->cl_r_in_flight++;
- else
- cli->cl_w_in_flight++;
-
- /* queued sync pages can be torn down while the pages
- * were between the pending list and the rpc */
- tmp = NULL;
- cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
- /* only one oap gets a request reference */
- if (tmp == NULL)
- tmp = oap;
- if (oap->oap_interrupted && !req->rq_intr) {
- CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
- oap, req);
- ptlrpc_mark_interrupted(req);
- }
- }
- if (tmp != NULL)
- tmp->oap_request = ptlrpc_request_addref(req);
-
- DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
- page_count, aa, cli->cl_r_in_flight, cli->cl_w_in_flight);
-
- req->rq_interpret_reply = brw_interpret;
-
- /* XXX: Maybe the caller can check the RPC bulk descriptor to see which
- * CPU/NUMA node the majority of pages were allocated on, and try
- * to assign the async RPC to the CPU core (PDL_POLICY_PREFERRED)
- * to reduce cross-CPU memory traffic.
- *
- * But on the other hand, we expect that multiple ptlrpcd threads
- * and the initial write sponsor can run in parallel, especially
- * when data checksum is enabled, which is CPU-bound operation and
- * single ptlrpcd thread cannot process in time. So more ptlrpcd
- * threads sharing BRW load (with PDL_POLICY_ROUND) seems better.
- */
- ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
- RETURN(1);
-}
-
-#define LOI_DEBUG(LOI, STR, args...) \
- CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
- !cfs_list_empty(&(LOI)->loi_ready_item) || \
- !cfs_list_empty(&(LOI)->loi_hp_ready_item), \
- (LOI)->loi_write_lop.lop_num_pending, \
- !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent), \
- (LOI)->loi_read_lop.lop_num_pending, \
- !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent), \
- args) \
-
-/* This is called by osc_check_rpcs() to find which objects have pages that
- * we could be sending. These lists are maintained by lop_makes_rpc(). */
-struct lov_oinfo *osc_next_loi(struct client_obd *cli)
-{
- ENTRY;
-
- /* First return objects that have blocked locks so that they
- * will be flushed quickly and other clients can get the lock,
- * then objects which have pages ready to be stuffed into RPCs */
- if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
- RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
- struct lov_oinfo, loi_hp_ready_item));
- if (!cfs_list_empty(&cli->cl_loi_ready_list))
- RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
- struct lov_oinfo, loi_ready_item));
-
- /* then if we have cache waiters, return all objects with queued
- * writes. This is especially important when many small files
- * have filled up the cache and not been fired into rpcs because
- * they don't pass the nr_pending/object threshhold */
- if (!cfs_list_empty(&cli->cl_cache_waiters) &&
- !cfs_list_empty(&cli->cl_loi_write_list))
- RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
- struct lov_oinfo, loi_write_item));
-
- /* then return all queued objects when we have an invalid import
- * so that they get flushed */
- if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
- if (!cfs_list_empty(&cli->cl_loi_write_list))
- RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
- struct lov_oinfo,
- loi_write_item));
- if (!cfs_list_empty(&cli->cl_loi_read_list))
- RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
- struct lov_oinfo, loi_read_item));
- }
- RETURN(NULL);
-}
-
-static int osc_max_rpc_in_flight(struct client_obd *cli, struct lov_oinfo *loi)
-{
- struct osc_async_page *oap;
- int hprpc = 0;
-
- if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
- oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
- struct osc_async_page, oap_urgent_item);
- hprpc = !!(oap->oap_async_flags & ASYNC_HP);
- }
-
- if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
- oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
- struct osc_async_page, oap_urgent_item);
- hprpc = !!(oap->oap_async_flags & ASYNC_HP);
- }
-
- return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
-}
-
-/* called with the loi list lock held */
-void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
-{
- struct lov_oinfo *loi;
- int rc = 0, race_counter = 0;
- ENTRY;
-
- while ((loi = osc_next_loi(cli)) != NULL) {
- LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
-
- if (osc_max_rpc_in_flight(cli, loi))
- break;
-
- /* attempt some read/write balancing by alternating between
- * reads and writes in an object. The makes_rpc checks here
- * would be redundant if we were getting read/write work items
- * instead of objects. we don't want send_oap_rpc to drain a
- * partial read pending queue when we're given this object to
- * do io on writes while there are cache waiters */
- if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
- rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
- &loi->loi_write_lop);
- if (rc < 0) {
- CERROR("Write request failed with %d\n", rc);
-
- /* osc_send_oap_rpc failed, mostly because of
- * memory pressure.
- *
- * It can't break here, because if:
- * - a page was submitted by osc_io_submit, so
- * page locked;
- * - no request in flight
- * - no subsequent request
- * The system will be in live-lock state,
- * because there is no chance to call
- * osc_io_unplug() and osc_check_rpcs() any
- * more. pdflush can't help in this case,
- * because it might be blocked at grabbing
- * the page lock as we mentioned.
- *
- * Anyway, continue to drain pages. */
- /* break; */
- }
-
- if (rc > 0)
- race_counter = 0;
- else if (rc == 0)
- race_counter++;
- }
- if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
- rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
- &loi->loi_read_lop);
- if (rc < 0)
- CERROR("Read request failed with %d\n", rc);
-
- if (rc > 0)
- race_counter = 0;
- else if (rc == 0)
- race_counter++;
- }
-
- /* attempt some inter-object balancing by issuing rpcs
- * for each object in turn */
- if (!cfs_list_empty(&loi->loi_hp_ready_item))
- cfs_list_del_init(&loi->loi_hp_ready_item);
- if (!cfs_list_empty(&loi->loi_ready_item))
- cfs_list_del_init(&loi->loi_ready_item);
- if (!cfs_list_empty(&loi->loi_write_item))
- cfs_list_del_init(&loi->loi_write_item);
- if (!cfs_list_empty(&loi->loi_read_item))
- cfs_list_del_init(&loi->loi_read_item);
-
- loi_list_maint(cli, loi);
-
- /* send_oap_rpc fails with 0 when make_ready tells it to
- * back off. llite's make_ready does this when it tries
- * to lock a page queued for write that is already locked.
- * we want to try sending rpcs from many objects, but we
- * don't want to spin failing with 0. */
- if (race_counter == 10)
- break;
- }
- EXIT;
-}
-
-/* we're trying to queue a page in the osc so we're subject to the
- * 'cl_dirty_max' limit on the number of pages that can be queued in the osc.
- * If the osc's queued pages are already at that limit, then we want to sleep
- * until there is space in the osc's queue for us. We also may be waiting for
- * write credits from the OST if there are RPCs in flight that may return some
- * before we fall back to sync writes.
- *
- * We need this know our allocation was granted in the presence of signals */
-static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
-{
- int rc;
- ENTRY;
- client_obd_list_lock(&cli->cl_loi_list_lock);
- rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
- client_obd_list_unlock(&cli->cl_loi_list_lock);
- RETURN(rc);
-};
-
-/**
- * Non-blocking version of osc_enter_cache() that consumes grant only when it
- * is available.
- */
-int osc_enter_cache_try(const struct lu_env *env,
- struct client_obd *cli, struct lov_oinfo *loi,
- struct osc_async_page *oap, int transient)
-{
- int has_grant;
-
- has_grant = cli->cl_avail_grant >= CFS_PAGE_SIZE;
- if (has_grant) {
- osc_consume_write_grant(cli, &oap->oap_brw_page);
- if (transient) {
- cli->cl_dirty_transit += CFS_PAGE_SIZE;
- cfs_atomic_inc(&obd_dirty_transit_pages);
- oap->oap_brw_flags |= OBD_BRW_NOCACHE;
- }
- }
- return has_grant;
-}
-
-/* Caller must hold loi_list_lock - we drop/regain it if we need to wait for
- * grant or cache space. */
-static int osc_enter_cache(const struct lu_env *env,
- struct client_obd *cli, struct lov_oinfo *loi,
- struct osc_async_page *oap)
-{
- struct osc_cache_waiter ocw;
- struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
-
- ENTRY;
-
- CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
- "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
- cli->cl_dirty_max, obd_max_dirty_pages,
- cli->cl_lost_grant, cli->cl_avail_grant);
-
- /* force the caller to try sync io. this can jump the list
- * of queued writes and create a discontiguous rpc stream */
- if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
- cli->cl_dirty_max < CFS_PAGE_SIZE ||
- cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
- RETURN(-EDQUOT);
-
- /* Hopefully normal case - cache space and write credits available */
- if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
- cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
- osc_enter_cache_try(env, cli, loi, oap, 0))
- RETURN(0);
-
- /* It is safe to block as a cache waiter as long as there is grant
- * space available or the hope of additional grant being returned
- * when an in flight write completes. Using the write back cache
- * if possible is preferable to sending the data synchronously
- * because write pages can then be merged in to large requests.
- * The addition of this cache waiter will causing pending write
- * pages to be sent immediately. */
- if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
- cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
- cfs_waitq_init(&ocw.ocw_waitq);
- ocw.ocw_oap = oap;
- ocw.ocw_rc = 0;
-
- loi_list_maint(cli, loi);
- osc_check_rpcs(env, cli);
- client_obd_list_unlock(&cli->cl_loi_list_lock);
-
- CDEBUG(D_CACHE, "sleeping for cache space\n");
- l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
-
- client_obd_list_lock(&cli->cl_loi_list_lock);
- if (!cfs_list_empty(&ocw.ocw_entry)) {
- cfs_list_del(&ocw.ocw_entry);
- RETURN(-EINTR);
- }
- RETURN(ocw.ocw_rc);
- }
-
- RETURN(-EDQUOT);
-}
-
-
-int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
- struct lov_oinfo *loi, cfs_page_t *page,
- obd_off offset, const struct obd_async_page_ops *ops,
- void *data, void **res, int nocache,
- struct lustre_handle *lockh)
-{
- struct osc_async_page *oap;
-
- ENTRY;
-
- if (!page)
- return cfs_size_round(sizeof(*oap));
-
- oap = *res;
- oap->oap_magic = OAP_MAGIC;
- oap->oap_cli = &exp->exp_obd->u.cli;
- oap->oap_loi = loi;
-
- oap->oap_caller_ops = ops;
- oap->oap_caller_data = data;
-
- oap->oap_page = page;
- oap->oap_obj_off = offset;
- if (!client_is_remote(exp) &&
- cfs_capable(CFS_CAP_SYS_RESOURCE))
- oap->oap_brw_flags = OBD_BRW_NOQUOTA;
-
- LASSERT(!(offset & ~CFS_PAGE_MASK));
-
- CFS_INIT_LIST_HEAD(&oap->oap_pending_item);
- CFS_INIT_LIST_HEAD(&oap->oap_urgent_item);
- CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
- CFS_INIT_LIST_HEAD(&oap->oap_page_list);
-
- cfs_spin_lock_init(&oap->oap_lock);
- CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
- RETURN(0);
-}
-
-int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
- struct lov_stripe_md *lsm, struct lov_oinfo *loi,
- struct osc_async_page *oap, int cmd, int off,
- int count, obd_flag brw_flags, enum async_flags async_flags)
-{
- struct client_obd *cli = &exp->exp_obd->u.cli;
- int rc = 0;
- ENTRY;
-
- if (oap->oap_magic != OAP_MAGIC)
- RETURN(-EINVAL);
-
- if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
- RETURN(-EIO);
-
- if (!cfs_list_empty(&oap->oap_pending_item) ||
- !cfs_list_empty(&oap->oap_urgent_item) ||
- !cfs_list_empty(&oap->oap_rpc_item))
- RETURN(-EBUSY);
-
- /* check if the file's owner/group is over quota */
- if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
- struct cl_object *obj;
- struct cl_attr attr; /* XXX put attr into thread info */
- unsigned int qid[MAXQUOTAS];
-
- obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
-
- cl_object_attr_lock(obj);
- rc = cl_object_attr_get(env, obj, &attr);
- cl_object_attr_unlock(obj);
-
- qid[USRQUOTA] = attr.cat_uid;
- qid[GRPQUOTA] = attr.cat_gid;
- if (rc == 0 &&
- lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
- rc = -EDQUOT;
- if (rc)
- RETURN(rc);
- }
-
- if (loi == NULL)
- loi = lsm->lsm_oinfo[0];
-
- client_obd_list_lock(&cli->cl_loi_list_lock);
-
- LASSERT(off + count <= CFS_PAGE_SIZE);
- oap->oap_cmd = cmd;
- oap->oap_page_off = off;
- oap->oap_count = count;
- oap->oap_brw_flags = brw_flags;
- /* Give a hint to OST that requests are coming from kswapd - bug19529 */
- if (cfs_memory_pressure_get())
- oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
- cfs_spin_lock(&oap->oap_lock);
- oap->oap_async_flags = async_flags;
- cfs_spin_unlock(&oap->oap_lock);
-
- if (cmd & OBD_BRW_WRITE) {
- rc = osc_enter_cache(env, cli, loi, oap);
- if (rc) {
- client_obd_list_unlock(&cli->cl_loi_list_lock);
- RETURN(rc);
- }
- }
-
- osc_oap_to_pending(oap);
- loi_list_maint(cli, loi);
-
- LOI_DEBUG(loi, "oap %p page %p added for cmd %d\n", oap, oap->oap_page,
- cmd);
-
- osc_check_rpcs(env, cli);
- client_obd_list_unlock(&cli->cl_loi_list_lock);
-
- RETURN(0);
-}
-
-/* aka (~was & now & flag), but this is more clear :) */
-#define SETTING(was, now, flag) (!(was & flag) && (now & flag))
-
-int osc_set_async_flags_base(struct client_obd *cli,
- struct lov_oinfo *loi, struct osc_async_page *oap,
- obd_flag async_flags)
-{
- struct loi_oap_pages *lop;
- int flags = 0;
- ENTRY;
-
- LASSERT(!cfs_list_empty(&oap->oap_pending_item));
-
- if (oap->oap_cmd & OBD_BRW_WRITE) {
- lop = &loi->loi_write_lop;
- } else {
- lop = &loi->loi_read_lop;
- }
-
- if ((oap->oap_async_flags & async_flags) == async_flags)
- RETURN(0);
-
- if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
- flags |= ASYNC_READY;
-
- if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
- cfs_list_empty(&oap->oap_rpc_item)) {
- if (oap->oap_async_flags & ASYNC_HP)
- cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
- else
- cfs_list_add_tail(&oap->oap_urgent_item,
- &lop->lop_urgent);
- flags |= ASYNC_URGENT;
- loi_list_maint(cli, loi);
- }
- cfs_spin_lock(&oap->oap_lock);
- oap->oap_async_flags |= flags;
- cfs_spin_unlock(&oap->oap_lock);
-
- LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
- oap->oap_async_flags);
- RETURN(0);
-}
-
-int osc_teardown_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
- struct lov_oinfo *loi, struct osc_async_page *oap)
-{
- struct client_obd *cli = &exp->exp_obd->u.cli;
- struct loi_oap_pages *lop;
- int rc = 0;
- ENTRY;
-
- if (oap->oap_magic != OAP_MAGIC)
- RETURN(-EINVAL);
-
- if (loi == NULL)
- loi = lsm->lsm_oinfo[0];
-
- if (oap->oap_cmd & OBD_BRW_WRITE) {
- lop = &loi->loi_write_lop;
- } else {
- lop = &loi->loi_read_lop;
- }
-
- client_obd_list_lock(&cli->cl_loi_list_lock);
-
- if (!cfs_list_empty(&oap->oap_rpc_item))
- GOTO(out, rc = -EBUSY);
-
- osc_exit_cache(cli, oap, 0);
- osc_wake_cache_waiters(cli);
-
- if (!cfs_list_empty(&oap->oap_urgent_item)) {
- cfs_list_del_init(&oap->oap_urgent_item);
- cfs_spin_lock(&oap->oap_lock);
- oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
- cfs_spin_unlock(&oap->oap_lock);
- }
- if (!cfs_list_empty(&oap->oap_pending_item)) {
- cfs_list_del_init(&oap->oap_pending_item);
- lop_update_pending(cli, lop, oap->oap_cmd, -1);
- }
- loi_list_maint(cli, loi);
- LOI_DEBUG(loi, "oap %p page %p torn down\n", oap, oap->oap_page);
-out:
- client_obd_list_unlock(&cli->cl_loi_list_lock);
- RETURN(rc);
-}
-
-static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
- struct ldlm_enqueue_info *einfo)
-{
- void *data = einfo->ei_cbdata;
- int set = 0;
-
- LASSERT(lock != NULL);
- LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
- LASSERT(lock->l_resource->lr_type == einfo->ei_type);
- LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
- LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
-
- lock_res_and_lock(lock);
- cfs_spin_lock(&osc_ast_guard);
-
- if (lock->l_ast_data == NULL)
- lock->l_ast_data = data;
- if (lock->l_ast_data == data)
- set = 1;
-
- cfs_spin_unlock(&osc_ast_guard);
- unlock_res_and_lock(lock);
-
- return set;
-}
+ return set;
+}
static int osc_set_data_with_check(struct lustre_handle *lockh,
struct ldlm_enqueue_info *einfo)
static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
obd_enqueue_update_f upcall, void *cookie,
- int *flags, int rc)
+ __u64 *flags, int agl, int rc)
{
int intent = *flags & LDLM_FL_HAS_INTENT;
ENTRY;
}
}
- if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) {
+ if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
+ (rc == 0)) {
*flags |= LDLM_FL_LVB_READY;
CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
struct ldlm_lock *lock;
struct lustre_handle handle;
__u32 mode;
+ struct ost_lvb *lvb;
+ __u32 lvb_len;
+ __u64 *flags = aa->oa_flags;
/* Make a local copy of a lock handle and a mode, because aa->oa_*
* might be freed anytime after lock upcall has been called. */
/* Let CP AST to grant the lock first. */
OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
+ if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
+ lvb = NULL;
+ lvb_len = 0;
+ } else {
+ lvb = aa->oa_lvb;
+ lvb_len = sizeof(*aa->oa_lvb);
+ }
+
/* Complete obtaining the lock procedure. */
rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
- mode, aa->oa_flags, aa->oa_lvb,
- sizeof(*aa->oa_lvb), &handle, rc);
+ mode, flags, lvb, lvb_len, &handle, rc);
/* Complete osc stuff. */
- rc = osc_enqueue_fini(req, aa->oa_lvb,
- aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
+ rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
+ flags, aa->oa_agl, rc);
OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
struct lov_oinfo *loi, int flags,
struct ost_lvb *lvb, __u32 mode, int rc)
{
+ struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
+
if (rc == ELDLM_OK) {
- struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
__u64 tmp;
LASSERT(lock != NULL);
lock->l_policy_data.l_extent.end);
}
ldlm_lock_allow_match(lock);
- LDLM_LOCK_PUT(lock);
} else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
+ LASSERT(lock != NULL);
loi->loi_lvb = *lvb;
+ ldlm_lock_allow_match(lock);
CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
" kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
rc = ELDLM_OK;
}
+
+ if (lock != NULL) {
+ if (rc != ELDLM_OK)
+ ldlm_lock_fail_match(lock);
+
+ LDLM_LOCK_PUT(lock);
+ }
}
EXPORT_SYMBOL(osc_update_enqueue);
* is excluded from the cluster -- such scenarious make the life difficult, so
* release locks just after they are obtained. */
int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
- int *flags, ldlm_policy_data_t *policy,
+ __u64 *flags, ldlm_policy_data_t *policy,
struct ost_lvb *lvb, int kms_valid,
obd_enqueue_update_f upcall, void *cookie,
struct ldlm_enqueue_info *einfo,
struct lustre_handle *lockh,
- struct ptlrpc_request_set *rqset, int async)
+ struct ptlrpc_request_set *rqset, int async, int agl)
{
struct obd_device *obd = exp->exp_obd;
struct ptlrpc_request *req = NULL;
int intent = *flags & LDLM_FL_HAS_INTENT;
+ int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
ldlm_mode_t mode;
int rc;
ENTRY;
mode = einfo->ei_mode;
if (einfo->ei_mode == LCK_PR)
mode |= LCK_PW;
- mode = ldlm_lock_match(obd->obd_namespace,
- *flags | LDLM_FL_LVB_READY, res_id,
+ mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
einfo->ei_type, policy, mode, lockh, 0);
if (mode) {
struct ldlm_lock *matched = ldlm_handle2lock(lockh);
- if (osc_set_lock_data_with_check(matched, einfo)) {
+ if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
+ /* For AGL, if enqueue RPC is sent but the lock is not
+ * granted, then skip to process this strpe.
+ * Return -ECANCELED to tell the caller. */
+ ldlm_lock_decref(lockh, mode);
+ LDLM_LOCK_PUT(matched);
+ RETURN(-ECANCELED);
+ } else if (osc_set_lock_data_with_check(matched, einfo)) {
+ *flags |= LDLM_FL_LVB_READY;
/* addref the lock only if not async requests and PW
* lock is matched whereas we asked for PR. */
if (!rqset && einfo->ei_mode != mode)
* are explained in lov_enqueue() */
}
- /* We already have a lock, and it's referenced */
+ /* We already have a lock, and it's referenced.
+ *
+ * At this point, the cl_lock::cll_state is CLS_QUEUING,
+ * AGL upcall may change it to CLS_HELD directly. */
(*upcall)(cookie, ELDLM_OK);
- /* For async requests, decref the lock. */
if (einfo->ei_mode != mode)
ldlm_lock_decref(lockh, LCK_PW);
else if (rqset)
+ /* For async requests, decref the lock. */
ldlm_lock_decref(lockh, einfo->ei_mode);
LDLM_LOCK_PUT(matched);
RETURN(ELDLM_OK);
- } else
+ } else {
ldlm_lock_decref(lockh, mode);
- LDLM_LOCK_PUT(matched);
+ LDLM_LOCK_PUT(matched);
+ }
}
no_match:
*flags &= ~LDLM_FL_BLOCK_GRANTED;
rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
- sizeof(*lvb), lockh, async);
+ sizeof(*lvb), LVB_T_OST, lockh, async);
if (rqset) {
if (!rc) {
struct osc_enqueue_args *aa;
aa->oa_cookie = cookie;
aa->oa_lvb = lvb;
aa->oa_lockh = lockh;
+ aa->oa_agl = !!agl;
req->rq_interpret_reply =
(ptlrpc_interpterer_t)osc_enqueue_interpret;
RETURN(rc);
}
- rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc);
+ rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
if (intent)
ptlrpc_req_finished(req);
&oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
- rqset, rqset != NULL);
+ rqset, rqset != NULL, 0);
RETURN(rc);
}
struct ptlrpc_request *req,
struct osc_async_args *aa, int rc)
{
- struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
struct obd_statfs *msfs;
- __u64 used;
ENTRY;
if (rc == -EBADR)
GOTO(out, rc = -EPROTO);
}
- /* Reinitialize the RDONLY and DEGRADED flags at the client
- * on each statfs, so they don't stay set permanently. */
- cfs_spin_lock(&cli->cl_oscc.oscc_lock);
-
- if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
- cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
- else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
- cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
-
- if (unlikely(msfs->os_state & OS_STATE_READONLY))
- cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
- else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
- cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
-
- /* Add a bit of hysteresis so this flag isn't continually flapping,
- * and ensure that new files don't get extremely fragmented due to
- * only a small amount of available space in the filesystem.
- * We want to set the NOSPC flag when there is less than ~0.1% free
- * and clear it when there is at least ~0.2% free space, so:
- * avail < ~0.1% max max = avail + used
- * 1025 * avail < avail + used used = blocks - free
- * 1024 * avail < used
- * 1024 * avail < blocks - free
- * avail < ((blocks - free) >> 10)
- *
- * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
- * lose that amount of space so in those cases we report no space left
- * if their is less than 1 GB left. */
- used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
- if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
- ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
- cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
- else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
- (msfs->os_ffree > 64) &&
- (msfs->os_bavail > (used << 1)))) {
- cli->cl_oscc.oscc_flags &= ~(OSCC_FLAG_NOSPC |
- OSCC_FLAG_NOSPC_BLK);
- }
-
- if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
- (msfs->os_bavail < used)))
- cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC_BLK;
-
- cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
-
*aa->aa_oi->oi_osfs = *msfs;
out:
rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
RETURN(rc);
}
-static int osc_statfs_async(struct obd_device *obd, struct obd_info *oinfo,
- __u64 max_age, struct ptlrpc_request_set *rqset)
+static int osc_statfs_async(struct obd_export *exp,
+ struct obd_info *oinfo, __u64 max_age,
+ struct ptlrpc_request_set *rqset)
{
+ struct obd_device *obd = class_exp2obd(exp);
struct ptlrpc_request *req;
struct osc_async_args *aa;
int rc;
RETURN(0);
}
-static int osc_statfs(struct obd_device *obd, struct obd_statfs *osfs,
- __u64 max_age, __u32 flags)
+static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
+ struct obd_statfs *osfs, __u64 max_age, __u32 flags)
{
+ struct obd_device *obd = class_exp2obd(exp);
struct obd_statfs *msfs;
struct ptlrpc_request *req;
struct obd_import *imp = NULL;
/*Since the request might also come from lprocfs, so we need
*sync this with client_disconnect_export Bug15684*/
- cfs_down_read(&obd->u.cli.cl_sem);
+ down_read(&obd->u.cli.cl_sem);
if (obd->u.cli.cl_import)
imp = class_import_get(obd->u.cli.cl_import);
- cfs_up_read(&obd->u.cli.cl_sem);
+ up_read(&obd->u.cli.cl_sem);
if (!imp)
RETURN(-ENODEV);
data->ioc_offset);
GOTO(out, err);
case OBD_IOC_POLL_QUOTACHECK:
- err = lquota_poll_check(quota_interface, exp,
- (struct if_quotacheck *)karg);
+ err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
GOTO(out, err);
case OBD_IOC_PING_TARGET:
err = ptlrpc_obd_ping(obd);
return err;
}
-static int osc_get_info(struct obd_export *exp, obd_count keylen,
- void *key, __u32 *vallen, void *val,
+static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
+ obd_count keylen, void *key, __u32 *vallen, void *val,
struct lov_stripe_md *lsm)
{
ENTRY;
RETURN(-EINVAL);
}
-static int osc_setinfo_mds_connect_import(struct obd_import *imp)
-{
- struct llog_ctxt *ctxt;
- int rc = 0;
- ENTRY;
-
- ctxt = llog_get_context(imp->imp_obd, LLOG_MDS_OST_ORIG_CTXT);
- if (ctxt) {
- rc = llog_initiator_connect(ctxt);
- llog_ctxt_put(ctxt);
- } else {
- /* XXX return an error? skip setting below flags? */
- }
-
- cfs_spin_lock(&imp->imp_lock);
- imp->imp_server_timeout = 1;
- imp->imp_pingable = 1;
- cfs_spin_unlock(&imp->imp_lock);
- CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
-
- RETURN(rc);
-}
-
-static int osc_setinfo_mds_conn_interpret(const struct lu_env *env,
- struct ptlrpc_request *req,
- void *aa, int rc)
-{
- ENTRY;
- if (rc != 0)
- RETURN(rc);
-
- RETURN(osc_setinfo_mds_connect_import(req->rq_import));
-}
-
-static int osc_set_info_async(struct obd_export *exp, obd_count keylen,
- void *key, obd_count vallen, void *val,
- struct ptlrpc_request_set *set)
+static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
+ obd_count keylen, void *key, obd_count vallen,
+ void *val, struct ptlrpc_request_set *set)
{
struct ptlrpc_request *req;
struct obd_device *obd = exp->exp_obd;
OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
- if (KEY_IS(KEY_NEXT_ID)) {
- obd_id new_val;
- struct osc_creator *oscc = &obd->u.cli.cl_oscc;
-
- if (vallen != sizeof(obd_id))
- RETURN(-ERANGE);
- if (val == NULL)
- RETURN(-EINVAL);
-
- if (vallen != sizeof(obd_id))
- RETURN(-EINVAL);
-
- /* avoid race between allocate new object and set next id
- * from ll_sync thread */
- cfs_spin_lock(&oscc->oscc_lock);
- new_val = *((obd_id*)val) + 1;
- if (new_val > oscc->oscc_next_id)
- oscc->oscc_next_id = new_val;
- cfs_spin_unlock(&oscc->oscc_lock);
- CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
- exp->exp_obd->obd_name,
- obd->u.cli.cl_oscc.oscc_next_id);
-
- RETURN(0);
- }
-
if (KEY_IS(KEY_CHECKSUM)) {
if (vallen != sizeof(int))
RETURN(-EINVAL);
RETURN(0);
}
+ if (KEY_IS(KEY_CACHE_SET)) {
+ struct client_obd *cli = &obd->u.cli;
+
+ LASSERT(cli->cl_cache == NULL); /* only once */
+ cli->cl_cache = (struct cl_client_cache *)val;
+ cfs_atomic_inc(&cli->cl_cache->ccc_users);
+ cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
+
+ /* add this osc into entity list */
+ LASSERT(cfs_list_empty(&cli->cl_lru_osc));
+ spin_lock(&cli->cl_cache->ccc_lru_lock);
+ cfs_list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
+ spin_unlock(&cli->cl_cache->ccc_lru_lock);
+
+ RETURN(0);
+ }
+
+ if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
+ struct client_obd *cli = &obd->u.cli;
+ int nr = cfs_atomic_read(&cli->cl_lru_in_list) >> 1;
+ int target = *(int *)val;
+
+ nr = osc_lru_shrink(cli, min(nr, target));
+ *(int *)val -= nr;
+ RETURN(0);
+ }
+
if (!set && !KEY_IS(KEY_GRANT_SHRINK))
RETURN(-EINVAL);
Even if something bad goes through, we'd get a -EINVAL from OST
anyway. */
- if (KEY_IS(KEY_GRANT_SHRINK))
- req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
- else
- req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
-
- if (req == NULL)
- RETURN(-ENOMEM);
-
- req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
- RCL_CLIENT, keylen);
- req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
- RCL_CLIENT, vallen);
- rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
- if (rc) {
- ptlrpc_request_free(req);
- RETURN(rc);
- }
-
- tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
- memcpy(tmp, key, keylen);
- tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
+ req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
+ &RQF_OST_SET_GRANT_INFO :
+ &RQF_OBD_SET_INFO);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
+ RCL_CLIENT, keylen);
+ if (!KEY_IS(KEY_GRANT_SHRINK))
+ req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
+ RCL_CLIENT, vallen);
+ rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
+ if (rc) {
+ ptlrpc_request_free(req);
+ RETURN(rc);
+ }
+
+ tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
+ memcpy(tmp, key, keylen);
+ tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
+ &RMF_OST_BODY :
+ &RMF_SETINFO_VAL);
memcpy(tmp, val, vallen);
- if (KEY_IS(KEY_MDS_CONN)) {
- struct osc_creator *oscc = &obd->u.cli.cl_oscc;
-
- oscc->oscc_oa.o_seq = (*(__u32 *)val);
- oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
- LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
- req->rq_no_delay = req->rq_no_resend = 1;
- req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
- } else if (KEY_IS(KEY_GRANT_SHRINK)) {
+ if (KEY_IS(KEY_GRANT_SHRINK)) {
struct osc_grant_args *aa;
struct obdo *oa;
}
-static struct llog_operations osc_size_repl_logops = {
- lop_cancel: llog_obd_repl_cancel
-};
-
-static struct llog_operations osc_mds_ost_orig_logops;
-
-static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
- struct obd_device *tgt, struct llog_catid *catid)
-{
- int rc;
- ENTRY;
-
- rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
- &catid->lci_logid, &osc_mds_ost_orig_logops);
- if (rc) {
- CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
- GOTO(out, rc);
- }
-
- rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
- NULL, &osc_size_repl_logops);
- if (rc) {
- struct llog_ctxt *ctxt =
- llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
- if (ctxt)
- llog_cleanup(ctxt);
- CERROR("failed LLOG_SIZE_REPL_CTXT\n");
- }
- GOTO(out, rc);
-out:
- if (rc) {
- CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
- obd->obd_name, tgt->obd_name, catid, rc);
- CERROR("logid "LPX64":0x%x\n",
- catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
- }
- return rc;
-}
-
static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
struct obd_device *disk_obd, int *index)
{
- struct llog_catid catid;
- static char name[32] = CATLIST;
- int rc;
- ENTRY;
-
- LASSERT(olg == &obd->obd_olg);
-
- cfs_mutex_down(&olg->olg_cat_processing);
- rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
- if (rc) {
- CERROR("rc: %d\n", rc);
- GOTO(out, rc);
- }
-
- CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
- obd->obd_name, *index, catid.lci_logid.lgl_oid,
- catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
-
- rc = __osc_llog_init(obd, olg, disk_obd, &catid);
- if (rc) {
- CERROR("rc: %d\n", rc);
- GOTO(out, rc);
- }
-
- rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
- if (rc) {
- CERROR("rc: %d\n", rc);
- GOTO(out, rc);
- }
-
- out:
- cfs_mutex_up(&olg->olg_cat_processing);
-
- return rc;
+ /* this code is not supposed to be used with LOD/OSP
+ * to be removed soon */
+ LBUG();
+ return 0;
}
static int osc_llog_finish(struct obd_device *obd, int count)
{
- struct llog_ctxt *ctxt;
- int rc = 0, rc2 = 0;
- ENTRY;
+ struct llog_ctxt *ctxt;
- ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
- if (ctxt)
- rc = llog_cleanup(ctxt);
+ ENTRY;
- ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
- if (ctxt)
- rc2 = llog_cleanup(ctxt);
- if (!rc)
- rc = rc2;
+ ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
+ if (ctxt) {
+ llog_cat_close(NULL, ctxt->loc_handle);
+ llog_cleanup(NULL, ctxt);
+ }
- RETURN(rc);
+ ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
+ if (ctxt)
+ llog_cleanup(NULL, ctxt);
+ RETURN(0);
}
static int osc_reconnect(const struct lu_env *env,
cli->cl_lost_grant = 0;
client_obd_list_unlock(&cli->cl_loi_list_lock);
- CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
- "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
- cli->cl_avail_grant, cli->cl_dirty, lost_grant);
CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
- " ocd_grant: %d\n", data->ocd_connect_flags,
- data->ocd_version, data->ocd_grant);
- }
+ " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
+ data->ocd_version, data->ocd_grant, lost_grant);
+ }
- RETURN(0);
+ RETURN(0);
}
static int osc_disconnect(struct obd_export *exp)
if (obd->u.cli.cl_conn_count == 1) {
/* Flush any remaining cancel messages out to the
* target */
- llog_sync(ctxt, exp);
+ llog_sync(ctxt, exp, 0);
}
llog_ctxt_put(ctxt);
} else {
switch (event) {
case IMP_EVENT_DISCON: {
- /* Only do this on the MDS OSC's */
- if (imp->imp_server_timeout) {
- struct osc_creator *oscc = &obd->u.cli.cl_oscc;
-
- cfs_spin_lock(&oscc->oscc_lock);
- oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
- cfs_spin_unlock(&oscc->oscc_lock);
- }
cli = &obd->u.cli;
client_obd_list_lock(&cli->cl_loi_list_lock);
cli->cl_avail_grant = 0;
if (!IS_ERR(env)) {
/* Reset grants */
cli = &obd->u.cli;
- client_obd_list_lock(&cli->cl_loi_list_lock);
/* all pages go to failing rpcs due to the invalid
* import */
- osc_check_rpcs(env, cli);
- client_obd_list_unlock(&cli->cl_loi_list_lock);
+ osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
cl_env_put(env, &refcheck);
break;
}
case IMP_EVENT_ACTIVE: {
- /* Only do this on the MDS OSC's */
- if (imp->imp_server_timeout) {
- struct osc_creator *oscc = &obd->u.cli.cl_oscc;
-
- cfs_spin_lock(&oscc->oscc_lock);
- oscc->oscc_flags &= ~(OSCC_FLAG_NOSPC |
- OSCC_FLAG_NOSPC_BLK);
- cfs_spin_unlock(&oscc->oscc_lock);
- }
rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
break;
}
RETURN(0);
}
-int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
+static int brw_queue_work(const struct lu_env *env, void *data)
{
- int rc;
- ENTRY;
-
- ENTRY;
- rc = ptlrpcd_addref();
- if (rc)
- RETURN(rc);
+ struct client_obd *cli = data;
- rc = client_obd_setup(obd, lcfg);
- if (rc) {
- ptlrpcd_decref();
- } else {
- struct lprocfs_static_vars lvars = { 0 };
- struct client_obd *cli = &obd->u.cli;
+ CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
- cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
- lprocfs_osc_init_vars(&lvars);
- if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
- lproc_osc_attach_seqstat(obd);
- sptlrpc_lprocfs_cliobd_attach(obd);
- ptlrpc_lprocfs_register_obd(obd);
- }
-
- oscc_init(obd);
- /* We need to allocate a few requests more, because
- brw_interpret tries to create new requests before freeing
- previous ones. Ideally we want to have 2x max_rpcs_in_flight
- reserved, but I afraid that might be too much wasted RAM
- in fact, so 2 is just my guess and still should work. */
- cli->cl_import->imp_rq_pool =
- ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
- OST_MAXREQSIZE,
- ptlrpc_add_rqs_to_pool);
-
- CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
- cfs_sema_init(&cli->cl_grant_sem, 1);
-
- ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
- }
+ osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
+ RETURN(0);
+}
- RETURN(rc);
+int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
+{
+ struct lprocfs_static_vars lvars = { 0 };
+ struct client_obd *cli = &obd->u.cli;
+ void *handler;
+ int rc;
+ ENTRY;
+
+ rc = ptlrpcd_addref();
+ if (rc)
+ RETURN(rc);
+
+ rc = client_obd_setup(obd, lcfg);
+ if (rc)
+ GOTO(out_ptlrpcd, rc);
+
+ handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
+ if (IS_ERR(handler))
+ GOTO(out_client_setup, rc = PTR_ERR(handler));
+ cli->cl_writeback_work = handler;
+
+ rc = osc_quota_setup(obd);
+ if (rc)
+ GOTO(out_ptlrpcd_work, rc);
+
+ cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
+ lprocfs_osc_init_vars(&lvars);
+ if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
+ lproc_osc_attach_seqstat(obd);
+ sptlrpc_lprocfs_cliobd_attach(obd);
+ ptlrpc_lprocfs_register_obd(obd);
+ }
+
+ /* We need to allocate a few requests more, because
+ * brw_interpret tries to create new requests before freeing
+ * previous ones, Ideally we want to have 2x max_rpcs_in_flight
+ * reserved, but I'm afraid that might be too much wasted RAM
+ * in fact, so 2 is just my guess and still should work. */
+ cli->cl_import->imp_rq_pool =
+ ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
+ OST_MAXREQSIZE,
+ ptlrpc_add_rqs_to_pool);
+
+ CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
+ ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
+ RETURN(rc);
+
+out_ptlrpcd_work:
+ ptlrpcd_destroy_work(handler);
+out_client_setup:
+ client_obd_cleanup(obd);
+out_ptlrpcd:
+ ptlrpcd_decref();
+ RETURN(rc);
}
static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
/* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
ptlrpc_deactivate_import(imp);
- cfs_spin_lock(&imp->imp_lock);
- imp->imp_pingable = 0;
- cfs_spin_unlock(&imp->imp_lock);
+ spin_lock(&imp->imp_lock);
+ imp->imp_pingable = 0;
+ spin_unlock(&imp->imp_lock);
break;
}
case OBD_CLEANUP_EXPORTS: {
+ struct client_obd *cli = &obd->u.cli;
/* LU-464
* for echo client, export may be on zombie list, wait for
* zombie thread to cull it, because cli.cl_import will be
* client_disconnect_export()
*/
obd_zombie_barrier();
+ if (cli->cl_writeback_work) {
+ ptlrpcd_destroy_work(cli->cl_writeback_work);
+ cli->cl_writeback_work = NULL;
+ }
obd_cleanup_client_import(obd);
+ ptlrpc_lprocfs_unregister_obd(obd);
+ lprocfs_obd_cleanup(obd);
rc = obd_llog_finish(obd, 0);
if (rc != 0)
CERROR("failed to cleanup llogging subsystems\n");
int osc_cleanup(struct obd_device *obd)
{
- int rc;
+ struct client_obd *cli = &obd->u.cli;
+ int rc;
- ENTRY;
- ptlrpc_lprocfs_unregister_obd(obd);
- lprocfs_obd_cleanup(obd);
+ ENTRY;
+
+ /* lru cleanup */
+ if (cli->cl_cache != NULL) {
+ LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_users) > 0);
+ spin_lock(&cli->cl_cache->ccc_lru_lock);
+ cfs_list_del_init(&cli->cl_lru_osc);
+ spin_unlock(&cli->cl_cache->ccc_lru_lock);
+ cli->cl_lru_left = NULL;
+ cfs_atomic_dec(&cli->cl_cache->ccc_users);
+ cli->cl_cache = NULL;
+ }
/* free memory of osc quota cache */
- lquota_cleanup(quota_interface, obd);
+ osc_quota_cleanup(obd);
rc = client_obd_cleanup(obd);
.o_statfs_async = osc_statfs_async,
.o_packmd = osc_packmd,
.o_unpackmd = osc_unpackmd,
- .o_precreate = osc_precreate,
.o_create = osc_create,
- .o_create_async = osc_create_async,
.o_destroy = osc_destroy,
.o_getattr = osc_getattr,
.o_getattr_async = osc_getattr_async,
.o_llog_init = osc_llog_init,
.o_llog_finish = osc_llog_finish,
.o_process_config = osc_process_config,
+ .o_quotactl = osc_quotactl,
+ .o_quotacheck = osc_quotacheck,
};
extern struct lu_kmem_descr osc_caches[];
-extern cfs_spinlock_t osc_ast_guard;
-extern cfs_lock_class_key_t osc_ast_guard_class;
+extern spinlock_t osc_ast_guard;
+extern struct lock_class_key osc_ast_guard_class;
int __init osc_init(void)
{
/* print an address of _any_ initialized kernel symbol from this
* module, to allow debugging with gdb that doesn't support data
* symbols from modules.*/
- CDEBUG(D_CONSOLE, "Lustre OSC module (%p).\n", &osc_caches);
+ CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
rc = lu_kmem_init(osc_caches);
lprocfs_osc_init_vars(&lvars);
- cfs_request_module("lquota");
- quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
- lquota_init(quota_interface);
- init_obd_quota_ops(quota_interface, &osc_obd_ops);
-
rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
LUSTRE_OSC_NAME, &osc_device_type);
if (rc) {
- if (quota_interface)
- PORTAL_SYMBOL_PUT(osc_quota_interface);
lu_kmem_fini(osc_caches);
RETURN(rc);
}
- cfs_spin_lock_init(&osc_ast_guard);
- cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
-
- osc_mds_ost_orig_logops = llog_lvfs_ops;
- osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
- osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
- osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
- osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
+ spin_lock_init(&osc_ast_guard);
+ lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
- RETURN(rc);
+ RETURN(rc);
}
#ifdef __KERNEL__
static void /*__exit*/ osc_exit(void)
{
- lu_device_type_fini(&osc_device_type);
-
- lquota_exit(quota_interface);
- if (quota_interface)
- PORTAL_SYMBOL_PUT(osc_quota_interface);
-
- class_unregister_type(LUSTRE_OSC_NAME);
- lu_kmem_fini(osc_caches);
+ class_unregister_type(LUSTRE_OSC_NAME);
+ lu_kmem_fini(osc_caches);
}
MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");