#include <libcfs/libcfs.h>
-#include <lustre_dlm.h>
-#include <lustre_net.h>
#include <lustre/lustre_user.h>
-#include <obd_cksum.h>
-#include <lustre_ha.h>
+
#include <lprocfs_status.h>
-#include <lustre_ioctl.h>
#include <lustre_debug.h>
-#include <lustre_param.h>
+#include <lustre_dlm.h>
#include <lustre_fid.h>
-#include <obd_class.h>
-#include <obd.h>
+#include <lustre_ha.h>
+#include <lustre_ioctl.h>
#include <lustre_net.h>
-#include "osc_internal.h"
+#include <lustre_obdo.h>
+#include <lustre_param.h>
+#include <obd.h>
+#include <obd_cksum.h>
+#include <obd_class.h>
+
#include "osc_cl_internal.h"
+#include "osc_internal.h"
atomic_t osc_pool_req_count;
unsigned int osc_reqpool_maxreqcount;
void *fa_cookie;
};
+struct osc_ladvise_args {
+ struct obdo *la_oa;
+ obd_enqueue_update_f la_upcall;
+ void *la_cookie;
+};
+
struct osc_enqueue_args {
struct obd_export *oa_exp;
enum ldlm_type oa_type;
RETURN(0);
}
+static int osc_ladvise_interpret(const struct lu_env *env,
+ struct ptlrpc_request *req,
+ void *arg, int rc)
+{
+ struct osc_ladvise_args *la = arg;
+ struct ost_body *body;
+ ENTRY;
+
+ if (rc != 0)
+ GOTO(out, rc);
+
+ body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+ if (body == NULL)
+ GOTO(out, rc = -EPROTO);
+
+ *la->la_oa = body->oa;
+out:
+ rc = la->la_upcall(la->la_cookie, rc);
+ RETURN(rc);
+}
+
+/**
+ * If rqset is NULL, do not wait for response. Upcall and cookie could also
+ * be NULL in this case
+ */
+int osc_ladvise_base(struct obd_export *exp, struct obdo *oa,
+ struct ladvise_hdr *ladvise_hdr,
+ obd_enqueue_update_f upcall, void *cookie,
+ struct ptlrpc_request_set *rqset)
+{
+ struct ptlrpc_request *req;
+ struct ost_body *body;
+ struct osc_ladvise_args *la;
+ int rc;
+ struct lu_ladvise *req_ladvise;
+ struct lu_ladvise *ladvise = ladvise_hdr->lah_advise;
+ int num_advise = ladvise_hdr->lah_count;
+ struct ladvise_hdr *req_ladvise_hdr;
+ ENTRY;
+
+ req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_LADVISE);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ req_capsule_set_size(&req->rq_pill, &RMF_OST_LADVISE, RCL_CLIENT,
+ num_advise * sizeof(*ladvise));
+ rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_LADVISE);
+ if (rc != 0) {
+ ptlrpc_request_free(req);
+ RETURN(rc);
+ }
+ req->rq_request_portal = OST_IO_PORTAL;
+ ptlrpc_at_set_req_timeout(req);
+
+ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+ LASSERT(body);
+ lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
+ oa);
+
+ req_ladvise_hdr = req_capsule_client_get(&req->rq_pill,
+ &RMF_OST_LADVISE_HDR);
+ memcpy(req_ladvise_hdr, ladvise_hdr, sizeof(*ladvise_hdr));
+
+ req_ladvise = req_capsule_client_get(&req->rq_pill, &RMF_OST_LADVISE);
+ memcpy(req_ladvise, ladvise, sizeof(*ladvise) * num_advise);
+ ptlrpc_request_set_replen(req);
+
+ if (rqset == NULL) {
+ /* Do not wait for response. */
+ ptlrpcd_add_req(req);
+ RETURN(0);
+ }
+
+ req->rq_interpret_reply = osc_ladvise_interpret;
+ CLASSERT(sizeof(*la) <= sizeof(req->rq_async_args));
+ la = ptlrpc_req_async_args(req);
+ la->la_oa = oa;
+ la->la_upcall = upcall;
+ la->la_cookie = cookie;
+
+ if (rqset == PTLRPCD_SET)
+ ptlrpcd_add_req(req);
+ else
+ ptlrpc_set_add_req(rqset, req);
+
+ RETURN(0);
+}
+
static int osc_create(const struct lu_env *env, struct obd_export *exp,
struct obdo *oa)
{
oa->o_valid |= bits;
spin_lock(&cli->cl_loi_list_lock);
- oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
+ if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data, GRANT_PARAM))
+ oa->o_dirty = cli->cl_dirty_grant;
+ else
+ oa->o_dirty = cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
if (unlikely(cli->cl_dirty_pages - cli->cl_dirty_transit >
cli->cl_dirty_max_pages)) {
CERROR("dirty %lu - %lu > dirty_max %lu\n",
cli->cl_dirty_pages, cli->cl_dirty_max_pages);
oa->o_undirty = 0;
} else {
- unsigned long max_in_flight = (cli->cl_max_pages_per_rpc <<
- PAGE_CACHE_SHIFT) *
- (cli->cl_max_rpcs_in_flight + 1);
- oa->o_undirty = max(cli->cl_dirty_max_pages << PAGE_CACHE_SHIFT,
- max_in_flight);
+ unsigned long nrpages;
+
+ nrpages = cli->cl_max_pages_per_rpc;
+ nrpages *= cli->cl_max_rpcs_in_flight + 1;
+ nrpages = max(nrpages, cli->cl_dirty_max_pages);
+ oa->o_undirty = nrpages << PAGE_CACHE_SHIFT;
+ if (OCD_HAS_FLAG(&cli->cl_import->imp_connect_data,
+ GRANT_PARAM)) {
+ int nrextents;
+
+ /* take extent tax into account when asking for more
+ * grant space */
+ nrextents = (nrpages + cli->cl_max_extent_pages - 1) /
+ cli->cl_max_extent_pages;
+ oa->o_undirty += nrextents * cli->cl_grant_extent_tax;
+ }
}
oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
oa->o_dropped = cli->cl_lost_grant;
spin_unlock(&cli->cl_loi_list_lock);
CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
-
}
void osc_update_next_shrink(struct client_obd *cli)
* left EVICTED state, then cl_dirty_pages must be 0 already.
*/
spin_lock(&cli->cl_loi_list_lock);
- if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
- cli->cl_avail_grant = ocd->ocd_grant;
- else
- cli->cl_avail_grant = ocd->ocd_grant -
- (cli->cl_dirty_pages << PAGE_CACHE_SHIFT);
+ cli->cl_avail_grant = ocd->ocd_grant;
+ if (cli->cl_import->imp_state != LUSTRE_IMP_EVICTED) {
+ cli->cl_avail_grant -= cli->cl_reserved_grant;
+ if (OCD_HAS_FLAG(ocd, GRANT_PARAM))
+ cli->cl_avail_grant -= cli->cl_dirty_grant;
+ else
+ cli->cl_avail_grant -=
+ cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
+ }
if (cli->cl_avail_grant < 0) {
CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
cli->cl_avail_grant = ocd->ocd_grant;
}
- /* determine the appropriate chunk size used by osc_extent. */
- cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
+ if (OCD_HAS_FLAG(ocd, GRANT_PARAM)) {
+ u64 size;
+ int chunk_mask;
+
+ /* overhead for each extent insertion */
+ cli->cl_grant_extent_tax = ocd->ocd_grant_tax_kb << 10;
+ /* determine the appropriate chunk size used by osc_extent. */
+ cli->cl_chunkbits = max_t(int, PAGE_SHIFT,
+ ocd->ocd_grant_blkbits);
+ /* max_pages_per_rpc must be chunk aligned */
+ chunk_mask = ~((1 << (cli->cl_chunkbits - PAGE_SHIFT)) - 1);
+ cli->cl_max_pages_per_rpc = (cli->cl_max_pages_per_rpc +
+ ~chunk_mask) & chunk_mask;
+ /* determine maximum extent size, in #pages */
+ size = (u64)ocd->ocd_grant_max_blks << ocd->ocd_grant_blkbits;
+ cli->cl_max_extent_pages = size >> PAGE_SHIFT;
+ if (cli->cl_max_extent_pages == 0)
+ cli->cl_max_extent_pages = 1;
+ } else {
+ cli->cl_grant_extent_tax = 0;
+ cli->cl_chunkbits = PAGE_SHIFT;
+ cli->cl_max_extent_pages = DT_MAX_BRW_PAGES;
+ }
spin_unlock(&cli->cl_loi_list_lock);
CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
- "chunk bits: %d.\n", cli_name(cli), cli->cl_avail_grant,
- cli->cl_lost_grant, cli->cl_chunkbits);
+ "chunk bits: %d cl_max_extent_pages: %d\n",
+ cli_name(cli),
+ cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits,
+ cli->cl_max_extent_pages);
if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
list_empty(&cli->cl_grant_shrink_list))
/* set/clear over quota flag for a uid/gid */
if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
- unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
+ unsigned int qid[LL_MAXQUOTAS] =
+ {body->oa.o_uid, body->oa.o_gid};
CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
bool soft_sync = false;
bool interrupted = false;
int i;
+ int grant = 0;
int rc;
struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
struct ost_body *body;
list_for_each_entry(ext, ext_list, oe_link) {
LASSERT(ext->oe_state == OES_RPC);
mem_tight |= ext->oe_memalloc;
+ grant += ext->oe_grants;
page_count += ext->oe_nr_pages;
if (obj == NULL)
obj = ext->oe_obj;
crattr->cra_oa = oa;
cl_req_attr_set(env, osc2cl(obj), crattr);
+ if (cmd == OBD_BRW_WRITE)
+ oa->o_grant_used = grant;
+
sort_brw_pages(pga, page_count);
rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
if (rc != 0) {
DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %ur/%uw in flight",
page_count, aa, cli->cl_r_in_flight,
cli->cl_w_in_flight);
+ OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_IO, 4);
ptlrpcd_add_req(req);
rc = 0;
RETURN(rc);
}
-static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
- struct ldlm_enqueue_info *einfo)
+static int osc_set_lock_data(struct ldlm_lock *lock, void *data)
{
- void *data = einfo->ei_cbdata;
int set = 0;
LASSERT(lock != NULL);
- LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
- LASSERT(lock->l_resource->lr_type == einfo->ei_type);
- LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
- LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
lock_res_and_lock(lock);
return set;
}
-static int osc_set_data_with_check(struct lustre_handle *lockh,
- struct ldlm_enqueue_info *einfo)
-{
- struct ldlm_lock *lock = ldlm_handle2lock(lockh);
- int set = 0;
-
- if (lock != NULL) {
- set = osc_set_lock_data_with_check(lock, einfo);
- LDLM_LOCK_PUT(lock);
- } else
- CERROR("lockh %p, data %p - client evicted?\n",
- lockh, einfo->ei_cbdata);
- return set;
-}
-
static int osc_enqueue_fini(struct ptlrpc_request *req,
osc_enqueue_upcall_f upcall, void *cookie,
struct lustre_handle *lockh, enum ldlm_mode mode,
struct lustre_handle lockh = { 0 };
struct ptlrpc_request *req = NULL;
int intent = *flags & LDLM_FL_HAS_INTENT;
- __u64 match_lvb = agl ? 0 : LDLM_FL_LVB_READY;
+ __u64 match_flags = *flags;
enum ldlm_mode mode;
int rc;
ENTRY;
mode = einfo->ei_mode;
if (einfo->ei_mode == LCK_PR)
mode |= LCK_PW;
- mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
+ if (agl == 0)
+ match_flags |= LDLM_FL_LVB_READY;
+ if (intent != 0)
+ match_flags |= LDLM_FL_BLOCK_GRANTED;
+ mode = ldlm_lock_match(obd->obd_namespace, match_flags, res_id,
einfo->ei_type, policy, mode, &lockh, 0);
if (mode) {
struct ldlm_lock *matched;
ldlm_lock_decref(&lockh, mode);
LDLM_LOCK_PUT(matched);
RETURN(-ECANCELED);
- } else if (osc_set_lock_data_with_check(matched, einfo)) {
+ } else if (osc_set_lock_data(matched, einfo->ei_cbdata)) {
*flags |= LDLM_FL_LVB_READY;
/* We already have a lock, and it's referenced. */
rc |= LCK_PW;
rc = ldlm_lock_match(obd->obd_namespace, lflags,
res_id, type, policy, rc, lockh, unref);
- if (rc) {
- if (data != NULL) {
- if (!osc_set_data_with_check(lockh, data)) {
- if (!(lflags & LDLM_FL_TEST_LOCK))
- ldlm_lock_decref(lockh, rc);
- RETURN(0);
- }
- }
- if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
- ldlm_lock_addref(lockh, LCK_PR);
- ldlm_lock_decref(lockh, LCK_PW);
- }
- RETURN(rc);
- }
- RETURN(rc);
+ if (rc == 0 || lflags & LDLM_FL_TEST_LOCK)
+ RETURN(rc);
+
+ if (data != NULL) {
+ struct ldlm_lock *lock = ldlm_handle2lock(lockh);
+
+ LASSERT(lock != NULL);
+ if (!osc_set_lock_data(lock, data)) {
+ ldlm_lock_decref(lockh, rc);
+ rc = 0;
+ }
+ LDLM_LOCK_PUT(lock);
+ }
+ RETURN(rc);
}
static int osc_statfs_interpret(const struct lu_env *env,
}
static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
- void *karg, void *uarg)
+ void *karg, void __user *uarg)
{
struct obd_device *obd = exp->exp_obd;
struct obd_ioctl_data *data = karg;
if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
long lost_grant;
+ long grant;
spin_lock(&cli->cl_loi_list_lock);
- data->ocd_grant = (cli->cl_avail_grant +
- (cli->cl_dirty_pages << PAGE_CACHE_SHIFT)) ?:
- 2 * cli_brw_size(obd);
+ grant = cli->cl_avail_grant + cli->cl_reserved_grant;
+ if (data->ocd_connect_flags & OBD_CONNECT_GRANT_PARAM)
+ grant += cli->cl_dirty_grant;
+ else
+ grant += cli->cl_dirty_pages << PAGE_CACHE_SHIFT;
+ data->ocd_grant = grant ? : 2 * cli_brw_size(obd);
lost_grant = cli->cl_lost_grant;
cli->cl_lost_grant = 0;
spin_unlock(&cli->cl_loi_list_lock);
case IMP_EVENT_INVALIDATE: {
struct ldlm_namespace *ns = obd->obd_namespace;
struct lu_env *env;
- int refcheck;
+ __u16 refcheck;
ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
RETURN(rc);
}
-static void /*__exit*/ osc_exit(void)
+static void __exit osc_exit(void)
{
remove_shrinker(osc_cache_shrinker);
class_unregister_type(LUSTRE_OSC_NAME);