* GPL HEADER END
*/
/*
- * Copyright 2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*/
/*
if (lsm) {
LASSERT(lsm->lsm_object_id);
- LASSERT_MDS_GROUP(lsm->lsm_object_gr);
+ LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
(*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
- (*lmmp)->lmm_object_gr = cpu_to_le64(lsm->lsm_object_gr);
+ (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
}
RETURN(lmm_size);
if (lmm != NULL) {
/* XXX zero *lsmp? */
(*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
- (*lsmp)->lsm_object_gr = le64_to_cpu (lmm->lmm_object_gr);
+ (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
LASSERT((*lsmp)->lsm_object_id);
- LASSERT_MDS_GROUP((*lsmp)->lsm_object_gr);
+ LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
}
(*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
LASSERT(body);
- body->oa = *oinfo->oi_oa;
+ lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
osc_pack_capa(req, body, oinfo->oi_capa);
}
if (rc != 0)
GOTO(out, rc);
- body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
- lustre_swab_ost_body);
+ body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
if (body) {
CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
- memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));
+ lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
/* This should really be sent by the OST */
aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
GOTO(out, rc = -EPROTO);
CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
- *oinfo->oi_oa = body->oa;
+ lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
/* This should really be sent by the OST */
oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
int rc;
ENTRY;
- LASSERTF(!(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP) ||
- CHECK_MDS_GROUP(oinfo->oi_oa->o_gr),
- "oinfo->oi_oa->o_valid="LPU64" oinfo->oi_oa->o_gr="LPU64"\n",
- oinfo->oi_oa->o_valid, oinfo->oi_oa->o_gr);
+ LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
if (req == NULL)
if (body == NULL)
GOTO(out, rc = -EPROTO);
- *oinfo->oi_oa = body->oa;
+ lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
EXIT;
out:
static int osc_setattr_interpret(const struct lu_env *env,
struct ptlrpc_request *req,
- struct osc_async_args *aa, int rc)
+ struct osc_setattr_args *sa, int rc)
{
struct ost_body *body;
ENTRY;
if (body == NULL)
GOTO(out, rc = -EPROTO);
- *aa->aa_oi->oi_oa = body->oa;
+ lustre_get_wire_obdo(sa->sa_oa, &body->oa);
out:
- rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
+ rc = sa->sa_upcall(sa->sa_cookie, rc);
RETURN(rc);
}
-static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
- struct obd_trans_info *oti,
- struct ptlrpc_request_set *rqset)
+int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
+ struct obd_trans_info *oti,
+ obd_enqueue_update_f upcall, void *cookie,
+ struct ptlrpc_request_set *rqset)
{
- struct ptlrpc_request *req;
- struct osc_async_args *aa;
- int rc;
+ struct ptlrpc_request *req;
+ struct osc_setattr_args *sa;
+ int rc;
ENTRY;
req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
RETURN(rc);
}
+ if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
+ oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
+
osc_pack_req_body(req, oinfo);
ptlrpc_request_set_replen(req);
- if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {
- LASSERT(oti);
- oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
- }
-
/* do mds to ost setattr asynchronously */
if (!rqset) {
/* Do not wait for response. */
req->rq_interpret_reply =
(ptlrpc_interpterer_t)osc_setattr_interpret;
- CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
- aa = ptlrpc_req_async_args(req);
- aa->aa_oi = oinfo;
+ CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
+ sa = ptlrpc_req_async_args(req);
+ sa->sa_oa = oinfo->oi_oa;
+ sa->sa_upcall = upcall;
+ sa->sa_cookie = cookie;
- ptlrpc_set_add_req(rqset, req);
+ if (rqset == PTLRPCD_SET)
+ ptlrpcd_add_req(req, PSCOPE_OTHER);
+ else
+ ptlrpc_set_add_req(rqset, req);
}
RETURN(0);
}
+static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
+ struct obd_trans_info *oti,
+ struct ptlrpc_request_set *rqset)
+{
+ return osc_setattr_async_base(exp, oinfo, oti,
+ oinfo->oi_cb_up, oinfo, rqset);
+}
+
int osc_real_create(struct obd_export *exp, struct obdo *oa,
struct lov_stripe_md **ea, struct obd_trans_info *oti)
{
body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
LASSERT(body);
- body->oa = *oa;
+ lustre_set_wire_obdo(&body->oa, oa);
ptlrpc_request_set_replen(req);
if (body == NULL)
GOTO(out_req, rc = -EPROTO);
- *oa = body->oa;
+ lustre_get_wire_obdo(oa, &body->oa);
/* This should really be sent by the OST */
oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
* This needs to be fixed in a big way.
*/
lsm->lsm_object_id = oa->o_id;
- lsm->lsm_object_gr = oa->o_gr;
+ lsm->lsm_object_seq = oa->o_seq;
*ea = lsm;
if (oti != NULL) {
RETURN(rc);
}
-static int osc_punch_interpret(const struct lu_env *env,
- struct ptlrpc_request *req,
- struct osc_punch_args *aa, int rc)
-{
- struct ost_body *body;
- ENTRY;
-
- if (rc != 0)
- GOTO(out, rc);
-
- body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
- if (body == NULL)
- GOTO(out, rc = -EPROTO);
-
- *aa->pa_oa = body->oa;
-out:
- rc = aa->pa_upcall(aa->pa_cookie, rc);
- RETURN(rc);
-}
-
-int osc_punch_base(struct obd_export *exp, struct obdo *oa,
- struct obd_capa *capa,
+int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
obd_enqueue_update_f upcall, void *cookie,
struct ptlrpc_request_set *rqset)
{
- struct ptlrpc_request *req;
- struct osc_punch_args *aa;
- struct ost_body *body;
- int rc;
+ struct ptlrpc_request *req;
+ struct osc_setattr_args *sa;
+ struct ost_body *body;
+ int rc;
ENTRY;
req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
if (req == NULL)
RETURN(-ENOMEM);
- osc_set_capa_size(req, &RMF_CAPA1, capa);
+ osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
if (rc) {
ptlrpc_request_free(req);
body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
LASSERT(body);
- body->oa = *oa;
- osc_pack_capa(req, body, capa);
+ lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
+ osc_pack_capa(req, body, oinfo->oi_capa);
ptlrpc_request_set_replen(req);
- req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_punch_interpret;
- CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
- aa = ptlrpc_req_async_args(req);
- aa->pa_oa = oa;
- aa->pa_upcall = upcall;
- aa->pa_cookie = cookie;
+ req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
+ CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
+ sa = ptlrpc_req_async_args(req);
+ sa->sa_oa = oinfo->oi_oa;
+ sa->sa_upcall = upcall;
+ sa->sa_cookie = cookie;
if (rqset == PTLRPCD_SET)
ptlrpcd_add_req(req, PSCOPE_OTHER);
else
oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
- return osc_punch_base(exp, oinfo->oi_oa, oinfo->oi_capa,
+ return osc_punch_base(exp, oinfo,
oinfo->oi_cb_up, oinfo, rqset);
}
/* overload the size and blocks fields in the oa with start/end */
body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
LASSERT(body);
- body->oa = *oa;
+ lustre_set_wire_obdo(&body->oa, oa);
body->oa.o_size = start;
body->oa.o_blocks = end;
body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
if (body == NULL)
GOTO(out, rc = -EPROTO);
- *oa = body->oa;
+ lustre_get_wire_obdo(oa, &body->oa);
EXIT;
out:
* @objid. Found locks are added into @cancel list. Returns the amount of
* locks added to @cancels list. */
static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
- struct list_head *cancels, ldlm_mode_t mode,
- int lock_flags)
+ cfs_list_t *cancels,
+ ldlm_mode_t mode, int lock_flags)
{
struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
struct ldlm_res_id res_id;
int count;
ENTRY;
- osc_build_res_name(oa->o_id, oa->o_gr, &res_id);
+ osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
if (res == NULL)
RETURN(0);
{
struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
- atomic_dec(&cli->cl_destroy_in_flight);
+ cfs_atomic_dec(&cli->cl_destroy_in_flight);
cfs_waitq_signal(&cli->cl_destroy_waitq);
return 0;
}
static int osc_can_send_destroy(struct client_obd *cli)
{
- if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
+ if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
cli->cl_max_rpcs_in_flight) {
/* The destroy request can be sent */
return 1;
}
- if (atomic_dec_return(&cli->cl_destroy_in_flight) <
+ if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
cli->cl_max_rpcs_in_flight) {
/*
* The counter has been modified between the two atomic
oa->o_lcookie = *oti->oti_logcookies;
body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
LASSERT(body);
- body->oa = *oa;
+ lustre_set_wire_obdo(&body->oa, oa);
osc_pack_capa(req, body, (struct obd_capa *)capa);
ptlrpc_request_set_replen(req);
if (!(cli->cl_import->imp_connect_flags_orig & OBD_CONNECT_MDS)) {
req->rq_interpret_reply = osc_destroy_interpret;
if (!osc_can_send_destroy(cli)) {
- struct l_wait_info lwi = { 0 };
+ struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
+ NULL);
/*
* Wait until the number of on-going destroy RPCs drops
CERROR("dirty %lu - %lu > dirty_max %lu\n",
cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
oa->o_undirty = 0;
- } else if (atomic_read(&obd_dirty_pages) -
- atomic_read(&obd_dirty_transit_pages) > obd_max_dirty_pages){
+ } else if (cfs_atomic_read(&obd_dirty_pages) -
+ cfs_atomic_read(&obd_dirty_transit_pages) >
+ obd_max_dirty_pages + 1){
+ /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
+ * not covered by a lock thus they may safely race and trip
+ * this CERROR() unless we add in a small fudge factor (+1). */
CERROR("dirty %d - %d > system dirty_max %d\n",
- atomic_read(&obd_dirty_pages),
- atomic_read(&obd_dirty_transit_pages),
+ cfs_atomic_read(&obd_dirty_pages),
+ cfs_atomic_read(&obd_dirty_transit_pages),
obd_max_dirty_pages);
oa->o_undirty = 0;
} else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {
static void osc_update_next_shrink(struct client_obd *cli)
{
- int time = GRANT_SHRINK_INTERVAL;
- cli->cl_next_shrink_grant = cfs_time_shift(time);
+ cli->cl_next_shrink_grant =
+ cfs_time_shift(cli->cl_grant_shrink_interval);
CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
cli->cl_next_shrink_grant);
}
static void osc_consume_write_grant(struct client_obd *cli,
struct brw_page *pga)
{
- LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock);
+ LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
LASSERT(!(pga->flag & OBD_BRW_FROM_GRANT));
- atomic_inc(&obd_dirty_pages);
+ cfs_atomic_inc(&obd_dirty_pages);
cli->cl_dirty += CFS_PAGE_SIZE;
cli->cl_avail_grant -= CFS_PAGE_SIZE;
pga->flag |= OBD_BRW_FROM_GRANT;
int blocksize = cli->cl_import->imp_obd->obd_osfs.os_bsize ? : 4096;
ENTRY;
- LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock);
+ LASSERT_SPIN_LOCKED(&cli->cl_loi_list_lock.lock);
if (!(pga->flag & OBD_BRW_FROM_GRANT)) {
EXIT;
return;
}
pga->flag &= ~OBD_BRW_FROM_GRANT;
- atomic_dec(&obd_dirty_pages);
+ cfs_atomic_dec(&obd_dirty_pages);
cli->cl_dirty -= CFS_PAGE_SIZE;
if (pga->flag & OBD_BRW_NOCACHE) {
pga->flag &= ~OBD_BRW_NOCACHE;
- atomic_dec(&obd_dirty_transit_pages);
+ cfs_atomic_dec(&obd_dirty_transit_pages);
cli->cl_dirty_transit -= CFS_PAGE_SIZE;
}
if (!sent) {
return cli->cl_r_in_flight + cli->cl_w_in_flight;
}
+int osc_wake_sync_fs(struct client_obd *cli)
+{
+ int rc = 0;
+ ENTRY;
+ if (cfs_list_empty(&cli->cl_loi_sync_fs_list) &&
+ cli->cl_sf_wait.started) {
+ cli->cl_sf_wait.sfw_upcall(cli->cl_sf_wait.sfw_oi, rc);
+ cli->cl_sf_wait.started = 0;
+ CDEBUG(D_CACHE, "sync_fs_loi list is empty\n");
+ }
+ RETURN(rc);
+}
+
/* caller must hold loi_list_lock */
void osc_wake_cache_waiters(struct client_obd *cli)
{
- struct list_head *l, *tmp;
+ cfs_list_t *l, *tmp;
struct osc_cache_waiter *ocw;
ENTRY;
- list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
+ cfs_list_for_each_safe(l, tmp, &cli->cl_cache_waiters) {
/* if we can't dirty more, we must wait until some is written */
if ((cli->cl_dirty + CFS_PAGE_SIZE > cli->cl_dirty_max) ||
- (atomic_read(&obd_dirty_pages) + 1 > obd_max_dirty_pages)) {
+ (cfs_atomic_read(&obd_dirty_pages) + 1 >
+ obd_max_dirty_pages)) {
CDEBUG(D_CACHE, "no dirty room: dirty: %ld "
"osc max %ld, sys max %d\n", cli->cl_dirty,
cli->cl_dirty_max, obd_max_dirty_pages);
return;
}
- ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
- list_del_init(&ocw->ocw_entry);
+ ocw = cfs_list_entry(l, struct osc_cache_waiter, ocw_entry);
+ cfs_list_del_init(&ocw->ocw_entry);
if (cli->cl_avail_grant < CFS_PAGE_SIZE) {
/* no more RPCs in flight to return grant, do sync IO */
ocw->ocw_rc = -EDQUOT;
cli->cl_avail_grant += grant;
client_obd_list_unlock(&cli->cl_loi_list_lock);
}
-
+
static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
{
if (body->oa.o_valid & OBD_MD_FLGRANT) {
struct ptlrpc_request_set *set);
static int osc_shrink_grant_interpret(const struct lu_env *env,
- struct ptlrpc_request *req,
+ struct ptlrpc_request *req,
void *aa, int rc)
{
struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
struct ost_body *body;
-
+
if (rc != 0) {
__osc_update_grant(cli, oa->o_grant);
GOTO(out, rc);
LASSERT(body);
osc_update_grant(cli, body);
out:
- OBD_FREE_PTR(oa);
- return rc;
+ OBDO_FREE(oa);
+ return rc;
}
static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
{
client_obd_list_lock(&cli->cl_loi_list_lock);
oa->o_grant = cli->cl_avail_grant / 4;
- cli->cl_avail_grant -= oa->o_grant;
+ cli->cl_avail_grant -= oa->o_grant;
client_obd_list_unlock(&cli->cl_loi_list_lock);
oa->o_flags |= OBD_FL_SHRINK_GRANT;
osc_update_next_shrink(cli);
}
+/* Shrink the current grant, either from some large amount to enough for a
+ * full set of in-flight RPCs, or if we have already shrunk to that limit
+ * then to enough for a single RPC. This avoids keeping more grant than
+ * needed, and avoids shrinking the grant piecemeal. */
static int osc_shrink_grant(struct client_obd *cli)
{
+ long target = (cli->cl_max_rpcs_in_flight + 1) *
+ cli->cl_max_pages_per_rpc;
+
+ client_obd_list_lock(&cli->cl_loi_list_lock);
+ if (cli->cl_avail_grant <= target)
+ target = cli->cl_max_pages_per_rpc;
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+ return osc_shrink_grant_to_target(cli, target);
+}
+
+int osc_shrink_grant_to_target(struct client_obd *cli, long target)
+{
int rc = 0;
struct ost_body *body;
ENTRY;
+ client_obd_list_lock(&cli->cl_loi_list_lock);
+ /* Don't shrink if we are already above or below the desired limit
+ * We don't want to shrink below a single RPC, as that will negatively
+ * impact block allocation and long-term performance. */
+ if (target < cli->cl_max_pages_per_rpc)
+ target = cli->cl_max_pages_per_rpc;
+
+ if (target >= cli->cl_avail_grant) {
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
+ RETURN(0);
+ }
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
+
OBD_ALLOC_PTR(body);
if (!body)
RETURN(-ENOMEM);
osc_announce_cached(cli, &body->oa, 0);
- osc_shrink_grant_local(cli, &body->oa);
+
+ client_obd_list_lock(&cli->cl_loi_list_lock);
+ body->oa.o_grant = cli->cl_avail_grant - target;
+ cli->cl_avail_grant = target;
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
+ body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
+ osc_update_next_shrink(cli);
+
rc = osc_set_info_async(cli->cl_import->imp_obd->obd_self_export,
sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
sizeof(*body), body, NULL);
if (rc != 0)
__osc_update_grant(cli, body->oa.o_grant);
- if (body)
- OBD_FREE_PTR(body);
+ OBD_FREE_PTR(body);
RETURN(rc);
}
{
cfs_time_t time = cfs_time_current();
cfs_time_t next_shrink = client->cl_next_shrink_grant;
+
+ if ((client->cl_import->imp_connect_data.ocd_connect_flags &
+ OBD_CONNECT_GRANT_SHRINK) == 0)
+ return 0;
+
if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
client->cl_avail_grant > GRANT_SHRINK_LIMIT)
{
struct client_obd *client;
- list_for_each_entry(client, &item->ti_obd_list, cl_grant_shrink_list) {
+ cfs_list_for_each_entry(client, &item->ti_obd_list,
+ cl_grant_shrink_list) {
if (osc_should_shrink_grant(client))
osc_shrink_grant(client);
}
{
int rc;
- rc = ptlrpc_add_timeout_client(GRANT_SHRINK_INTERVAL,
- TIMEOUT_GRANT,
- osc_grant_shrink_grant_cb, NULL,
- &client->cl_grant_shrink_list);
+ rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
+ TIMEOUT_GRANT,
+ osc_grant_shrink_grant_cb, NULL,
+ &client->cl_grant_shrink_list);
if (rc) {
- CERROR("add grant client %s error %d\n",
+ CERROR("add grant client %s error %d\n",
client->cl_import->imp_obd->obd_name, rc);
return rc;
}
- CDEBUG(D_CACHE, "add grant client %s \n",
+ CDEBUG(D_CACHE, "add grant client %s \n",
client->cl_import->imp_obd->obd_name);
osc_update_next_shrink(client);
- return 0;
+ return 0;
}
static int osc_del_shrink_grant(struct client_obd *client)
{
- return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
+ return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
TIMEOUT_GRANT);
}
static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
{
+ /*
+ * ocd_grant is the total grant amount we're expect to hold: if we've
+ * been evicted, it's the new avail_grant amount, cl_dirty will drop
+ * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
+ *
+ * race is tolerable here: if we're evicted, but imp_state already
+ * left EVICTED state, then cl_dirty must be 0 already.
+ */
client_obd_list_lock(&cli->cl_loi_list_lock);
- cli->cl_avail_grant = ocd->ocd_grant;
+ if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
+ cli->cl_avail_grant = ocd->ocd_grant;
+ else
+ cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
+
+ if (cli->cl_avail_grant < 0) {
+ CWARN("%s: available grant < 0, the OSS is probably not running"
+ " with patch from bug20278 (%ld) \n",
+ cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
+ /* workaround for 1.6 servers which do not have
+ * the patch from bug20278 */
+ cli->cl_avail_grant = ocd->ocd_grant;
+ }
+
client_obd_list_unlock(&cli->cl_loi_list_lock);
+ CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
+ cli->cl_import->imp_obd->obd_name,
+ cli->cl_avail_grant, cli->cl_lost_grant);
+
if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
- list_empty(&cli->cl_grant_shrink_list))
+ cfs_list_empty(&cli->cl_grant_shrink_list))
osc_add_shrink_grant(cli);
-
- CDEBUG(D_CACHE, "setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
- cli->cl_avail_grant, cli->cl_lost_grant);
- LASSERT(cli->cl_avail_grant >= 0);
}
/* We assume that the reason this OSC got a short read is because it read
int requested_nob, int niocount,
obd_count page_count, struct brw_page **pga)
{
- int *remote_rcs, i;
+ int i;
+ __u32 *remote_rcs;
- /* return error if any niobuf was in error */
- remote_rcs = lustre_swab_repbuf(req, REQ_REC_OFF + 1,
- sizeof(*remote_rcs) * niocount, NULL);
+ remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
+ sizeof(*remote_rcs) *
+ niocount);
if (remote_rcs == NULL) {
CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
return(-EPROTO);
}
- if (lustre_msg_swabbed(req->rq_repmsg))
- for (i = 0; i < niocount; i++)
- __swab32s(&remote_rcs[i]);
+ /* return error if any niobuf was in error */
for (i = 0; i < niocount; i++) {
if (remote_rcs[i] < 0)
return(remote_rcs[i]);
{
if (p1->flag != p2->flag) {
unsigned mask = ~(OBD_BRW_FROM_GRANT|
- OBD_BRW_NOCACHE|OBD_BRW_SYNC);
+ OBD_BRW_NOCACHE|OBD_BRW_SYNC|OBD_BRW_ASYNC);
/* warn if we try to combine flags that we don't know to be
* safe to combine */
opc = OST_WRITE;
req = ptlrpc_request_alloc_pool(cli->cl_import,
cli->cl_import->imp_rq_pool,
- &RQF_OST_BRW);
+ &RQF_OST_BRW_WRITE);
} else {
opc = OST_READ;
- req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW);
+ req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
}
if (req == NULL)
RETURN(-ENOMEM);
}
pill = &req->rq_pill;
+ req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
+ sizeof(*ioobj));
req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
niocount * sizeof(*niobuf));
osc_set_capa_size(req, &RMF_CAPA1, ocapa);
body = req_capsule_client_get(pill, &RMF_OST_BODY);
ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
- LASSERT(body && ioobj && niobuf);
+ LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
- body->oa = *oa;
+ lustre_set_wire_obdo(&body->oa, oa);
obdo_to_ioobj(oa, ioobj);
ioobj->ioo_bufcnt = niocount;
}
LASSERTF((void *)(niobuf - niocount) ==
- lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 2,
- niocount * sizeof(*niobuf)),
- "want %p - real %p\n", lustre_msg_buf(req->rq_reqmsg,
- REQ_REC_OFF + 2, niocount * sizeof(*niobuf)),
- (void *)(niobuf - niocount));
+ req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
+ "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
+ &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
if (osc_should_shrink_grant(cli))
- osc_shrink_grant_local(cli, &body->oa);
+ osc_shrink_grant_local(cli, &body->oa);
/* size[REQ_REC_OFF] still sizeof (*body) */
if (opc == OST_WRITE) {
* it can be changed via lprocfs */
cksum_type_t cksum_type = cli->cl_cksum_type;
- if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
- oa->o_flags = body->oa.o_flags = 0;
+ if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
+ oa->o_flags &= OBD_FL_LOCAL_MASK;
+ body->oa.o_flags = 0;
+ }
body->oa.o_flags |= cksum_type_pack(cksum_type);
body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
body->oa.o_cksum = osc_checksum_bulk(requested_nob,
}
oa->o_cksum = body->oa.o_cksum;
/* 1 RC per niobuf */
- req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER,
+ req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
sizeof(__u32) * niocount);
} else {
if (unlikely(cli->cl_checksum) &&
body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
}
- req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_SERVER, 0);
- /* 1 RC for the whole I/O */
}
ptlrpc_request_set_replen(req);
return 0;
}
+ /* If this is mmaped file - it can be changed at any time */
+ if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & OBD_FL_MMAP)
+ return 1;
+
if (oa->o_valid & OBD_MD_FLFLAGS)
cksum_type = cksum_type_unpack(oa->o_flags);
else
msg = "changed in transit AND doesn't match the original - "
"likely false positive due to mmap IO (bug 11742)";
- LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inum "
- LPU64"/"LPU64" object "LPU64"/"LPU64" extent "
- "["LPU64"-"LPU64"]\n",
+ LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
+ " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
msg, libcfs_nid2str(peer->nid),
- oa->o_valid & OBD_MD_FLFID ? oa->o_fid : (__u64)0,
- oa->o_valid & OBD_MD_FLFID ? oa->o_generation :
- (__u64)0,
+ oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
+ oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
+ oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
oa->o_id,
- oa->o_valid & OBD_MD_FLGROUP ? oa->o_gr : (__u64)0,
+ oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
pga[0]->off,
pga[page_count-1]->off + pga[page_count-1]->count - 1);
CERROR("original client csum %x (type %x), server csum %x (type %x), "
__u32 client_cksum = 0;
ENTRY;
- if (rc < 0 && rc != -EDQUOT)
+ if (rc < 0 && rc != -EDQUOT) {
+ DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
RETURN(rc);
+ }
LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
- body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
- lustre_swab_ost_body);
+ body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
if (body == NULL) {
- CDEBUG(D_INFO, "Can't unpack body\n");
+ DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
RETURN(-EPROTO);
}
+#ifdef HAVE_QUOTA_SUPPORT
/* set/clear over quota flag for a uid/gid */
if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
- body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA))
- lquota_setdq(quota_interface, cli, body->oa.o_uid,
- body->oa.o_gid, body->oa.o_valid,
+ body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
+ unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
+
+ CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
+ body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
+ body->oa.o_flags);
+ lquota_setdq(quota_interface, cli, qid, body->oa.o_valid,
body->oa.o_flags);
+ }
+#endif
+
+ osc_update_grant(cli, body);
if (rc < 0)
RETURN(rc);
if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
client_cksum = aa->aa_oa->o_cksum; /* save for later */
- osc_update_grant(cli, body);
-
if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
if (rc > 0) {
CERROR("Unexpected +ve rc %d\n", rc);
libcfs_nid2str(peer->nid));
} else if (server_cksum != client_cksum) {
LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
- "%s%s%s inum "LPU64"/"LPU64" object "
+ "%s%s%s inode "DFID" object "
LPU64"/"LPU64" extent "
"["LPU64"-"LPU64"]\n",
req->rq_import->imp_obd->obd_name,
libcfs_nid2str(peer->nid),
via, router,
body->oa.o_valid & OBD_MD_FLFID ?
- body->oa.o_fid : (__u64)0,
+ body->oa.o_parent_seq : (__u64)0,
body->oa.o_valid & OBD_MD_FLFID ?
- body->oa.o_generation :(__u64)0,
+ body->oa.o_parent_oid : 0,
+ body->oa.o_valid & OBD_MD_FLFID ?
+ body->oa.o_parent_ver : 0,
body->oa.o_id,
body->oa.o_valid & OBD_MD_FLGROUP ?
- body->oa.o_gr : (__u64)0,
+ body->oa.o_seq : (__u64)0,
aa->aa_ppga[0]->off,
aa->aa_ppga[aa->aa_page_count-1]->off +
aa->aa_ppga[aa->aa_page_count-1]->count -
}
out:
if (rc >= 0)
- *aa->aa_oa = body->oa;
+ lustre_get_wire_obdo(aa->aa_oa, &body->oa);
RETURN(rc);
}
ENTRY;
if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
- CERROR("too many resend retries, returning error\n");
+ CERROR("too many resent retries, returning error\n");
RETURN(-EIO);
}
client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
- list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
+ cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
if (oap->oap_request != NULL) {
LASSERTF(request == oap->oap_request,
"request %p != oap_request %p\n",
new_aa = ptlrpc_req_async_args(new_req);
CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
- list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
+ cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
CFS_INIT_LIST_HEAD(&aa->aa_oaps);
- list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
+ cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
if (oap->oap_request) {
ptlrpc_req_finished(oap->oap_request);
oap->oap_request = ptlrpc_request_addref(new_req);
osc_release_write_grant(cli, &oap->oap_brw_page, sent);
}
+static int lop_makes_syncfs_rpc(struct loi_oap_pages *lop)
+{
+ struct osc_async_page *oap;
+ ENTRY;
+
+ if (cfs_list_empty(&lop->lop_urgent))
+ RETURN(0);
+
+ oap = cfs_list_entry(lop->lop_urgent.next,
+ struct osc_async_page, oap_urgent_item);
+
+ if (oap->oap_async_flags & ASYNC_SYNCFS) {
+ CDEBUG(D_CACHE, "syncfs request forcing RPC\n");
+ RETURN(1);
+ }
+
+ RETURN(0);
+}
/* This maintains the lists of pending pages to read/write for a given object
* (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
* queued. this is our cheap solution for good batching in the case
* where writepage marks some random page in the middle of the file
* as urgent because of, say, memory pressure */
- if (!list_empty(&lop->lop_urgent)) {
+ if (!cfs_list_empty(&lop->lop_urgent)) {
CDEBUG(D_CACHE, "urgent request forcing RPC\n");
RETURN(1);
}
if (cmd & OBD_BRW_WRITE) {
/* trigger a write rpc stream as long as there are dirtiers
* waiting for space. as they're waiting, they're not going to
- * create more pages to coallesce with what's waiting.. */
- if (!list_empty(&cli->cl_cache_waiters)) {
+ * create more pages to coalesce with what's waiting.. */
+ if (!cfs_list_empty(&cli->cl_cache_waiters)) {
CDEBUG(D_CACHE, "cache waiters forcing RPC\n");
RETURN(1);
}
struct osc_async_page *oap;
ENTRY;
- if (list_empty(&lop->lop_urgent))
+ if (cfs_list_empty(&lop->lop_urgent))
RETURN(0);
- oap = list_entry(lop->lop_urgent.next,
+ oap = cfs_list_entry(lop->lop_urgent.next,
struct osc_async_page, oap_urgent_item);
if (oap->oap_async_flags & ASYNC_HP) {
RETURN(0);
}
-static void on_list(struct list_head *item, struct list_head *list,
+static void on_list(cfs_list_t *item, cfs_list_t *list,
int should_be_on)
{
- if (list_empty(item) && should_be_on)
- list_add_tail(item, list);
- else if (!list_empty(item) && !should_be_on)
- list_del_init(item);
+ if (cfs_list_empty(item) && should_be_on)
+ cfs_list_add_tail(item, list);
+ else if (!cfs_list_empty(item) && !should_be_on)
+ cfs_list_del_init(item);
}
/* maintain the loi's cli list membership invariants so that osc_send_oap_rpc
on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
} else {
- on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
- on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
- lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
- lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
+ if (lop_makes_syncfs_rpc(&loi->loi_write_lop)) {
+ on_list(&loi->loi_sync_fs_item,
+ &cli->cl_loi_sync_fs_list,
+ loi->loi_write_lop.lop_num_pending);
+ } else {
+ on_list(&loi->loi_hp_ready_item,
+ &cli->cl_loi_hp_ready_list, 0);
+ on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
+ lop_makes_rpc(cli, &loi->loi_write_lop,
+ OBD_BRW_WRITE)||
+ lop_makes_rpc(cli, &loi->loi_read_lop,
+ OBD_BRW_READ));
+ }
}
on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
* page completion may be called only if ->cpo_prep() method was
* executed by osc_io_submit(), that also adds page the to pending list
*/
- if (!list_empty(&oap->oap_pending_item)) {
- list_del_init(&oap->oap_pending_item);
- list_del_init(&oap->oap_urgent_item);
+ if (!cfs_list_empty(&oap->oap_pending_item)) {
+ cfs_list_del_init(&oap->oap_pending_item);
+ cfs_list_del_init(&oap->oap_urgent_item);
loi = oap->oap_loi;
lop = (oap->oap_cmd & OBD_BRW_WRITE) ?
lop = &oap->oap_loi->loi_read_lop;
if (oap->oap_async_flags & ASYNC_HP)
- list_add(&oap->oap_urgent_item, &lop->lop_urgent);
+ cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
else if (oap->oap_async_flags & ASYNC_URGENT)
- list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
- list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
+ cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
+ cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
}
oap->oap_request = NULL;
}
+ cfs_spin_lock(&oap->oap_lock);
oap->oap_async_flags = 0;
+ cfs_spin_unlock(&oap->oap_lock);
oap->oap_interrupted = 0;
if (oap->oap_cmd & OBD_BRW_WRITE) {
rc = osc_brw_fini_request(req, rc);
CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
if (osc_recoverable_error(rc)) {
- rc = osc_brw_redo_request(req, aa);
- if (rc == 0)
- RETURN(0);
+ /* Only retry once for mmaped files since the mmaped page
+ * might be modified at anytime. We have to retry at least
+ * once in case there WAS really a corruption of the page
+ * on the network, that was not caused by mmap() modifying
+ * the page. Bug11742 */
+ if ((rc == -EAGAIN) && (aa->aa_resends > 0) &&
+ aa->aa_oa->o_valid & OBD_MD_FLFLAGS &&
+ aa->aa_oa->o_flags & OBD_FL_MMAP) {
+ rc = 0;
+ } else {
+ rc = osc_brw_redo_request(req, aa);
+ if (rc == 0)
+ RETURN(0);
+ }
}
if (aa->aa_ocapa) {
else
cli->cl_r_in_flight--;
- async = list_empty(&aa->aa_oaps);
+ async = cfs_list_empty(&aa->aa_oaps);
if (!async) { /* from osc_send_oap_rpc() */
struct osc_async_page *oap, *tmp;
/* the caller may re-use the oap after the completion call so
* we need to clean it up a little */
- list_for_each_entry_safe(oap, tmp, &aa->aa_oaps, oap_rpc_item) {
- list_del_init(&oap->oap_rpc_item);
+ cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
+ oap_rpc_item) {
+ cfs_list_del_init(&oap->oap_rpc_item);
osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
}
OBDO_FREE(aa->aa_oa);
} else { /* from async_internal() */
- int i;
+ obd_count i;
for (i = 0; i < aa->aa_page_count; i++)
osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
}
osc_wake_cache_waiters(cli);
+ osc_wake_sync_fs(cli);
osc_check_rpcs(env, cli);
client_obd_list_unlock(&cli->cl_loi_list_lock);
if (!async)
cl_req_completion(env, aa->aa_clerq, rc);
osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
+
RETURN(rc);
}
static struct ptlrpc_request *osc_build_req(const struct lu_env *env,
struct client_obd *cli,
- struct list_head *rpc_list,
+ cfs_list_t *rpc_list,
int page_count, int cmd)
{
struct ptlrpc_request *req;
enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
struct ldlm_lock *lock = NULL;
struct cl_req_attr crattr;
- int i, rc;
+ int i, rc, mpflag = 0;
ENTRY;
- LASSERT(!list_empty(rpc_list));
+ LASSERT(!cfs_list_empty(rpc_list));
+
+ if (cmd & OBD_BRW_MEMALLOC)
+ mpflag = cfs_memory_pressure_get_and_set();
memset(&crattr, 0, sizeof crattr);
OBD_ALLOC(pga, sizeof(*pga) * page_count);
GOTO(out, req = ERR_PTR(-ENOMEM));
i = 0;
- list_for_each_entry(oap, rpc_list, oap_rpc_item) {
+ cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
struct cl_page *page = osc_oap2cl_page(oap);
if (ops == NULL) {
ops = oap->oap_caller_ops;
GOTO(out, req = ERR_PTR(rc));
}
+ if (cmd & OBD_BRW_MEMALLOC)
+ req->rq_memalloc = 1;
+
/* Need to update the timestamps after the request is built in case
* we race with setattr (locally or in queue at OST). If OST gets
* later setattr before earlier BRW (as determined by the request xid),
CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
aa = ptlrpc_req_async_args(req);
CFS_INIT_LIST_HEAD(&aa->aa_oaps);
- list_splice(rpc_list, &aa->aa_oaps);
+ cfs_list_splice(rpc_list, &aa->aa_oaps);
CFS_INIT_LIST_HEAD(rpc_list);
aa->aa_clerq = clerq;
out:
+ if (cmd & OBD_BRW_MEMALLOC)
+ cfs_memory_pressure_restore(mpflag);
+
capa_put(crattr.cra_capa);
if (IS_ERR(req)) {
if (oa)
/* this should happen rarely and is pretty bad, it makes the
* pending list not follow the dirty order */
client_obd_list_lock(&cli->cl_loi_list_lock);
- list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
- list_del_init(&oap->oap_rpc_item);
+ cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
+ cfs_list_del_init(&oap->oap_rpc_item);
/* queued sync pages can be torn down while the pages
* were between the pending list and the rpc */
/**
* prepare pages for ASYNC io and put pages in send queue.
*
- * \param cli -
- * \param loi -
- * \param cmd - OBD_BRW_* macroses
- * \param lop - pending pages
+ * \param cmd OBD_BRW_* macroses
+ * \param lop pending pages
*
- * \return zero if pages successfully add to send queue.
- * \return not zere if error occurring.
+ * \return zero if no page added to send queue.
+ * \return 1 if pages successfully added to send queue.
+ * \return negative on errors.
*/
static int
osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
struct osc_brw_async_args *aa;
const struct obd_async_page_ops *ops;
CFS_LIST_HEAD(rpc_list);
+ CFS_LIST_HEAD(tmp_list);
unsigned int ending_offset;
unsigned starting_offset = 0;
- int srvlock = 0;
+ int srvlock = 0, mem_tight = 0;
struct cl_object *clob = NULL;
ENTRY;
- /* If there are HP OAPs we need to handle at least 1 of them,
- * move it the beginning of the pending list for that. */
- if (!list_empty(&lop->lop_urgent)) {
- oap = list_entry(lop->lop_urgent.next,
- struct osc_async_page, oap_urgent_item);
+ /* ASYNC_HP pages first. At present, when the lock the pages is
+ * to be canceled, the pages covered by the lock will be sent out
+ * with ASYNC_HP. We have to send out them as soon as possible. */
+ cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
if (oap->oap_async_flags & ASYNC_HP)
- list_move(&oap->oap_pending_item, &lop->lop_pending);
+ cfs_list_move(&oap->oap_pending_item, &tmp_list);
+ else
+ cfs_list_move_tail(&oap->oap_pending_item, &tmp_list);
+ if (++page_count >= cli->cl_max_pages_per_rpc)
+ break;
}
+ cfs_list_splice(&tmp_list, &lop->lop_pending);
+ page_count = 0;
+
/* first we find the pages we're allowed to work with */
- list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
- oap_pending_item) {
+ cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_pending,
+ oap_pending_item) {
ops = oap->oap_caller_ops;
LASSERTF(oap->oap_magic == OAP_MAGIC, "Bad oap magic: oap %p, "
oap, oap->oap_brw_page.pg, (unsigned)!srvlock);
break;
}
+
+ /* If there is a gap at the start of this page, it can't merge
+ * with any previous page, so we'll hand the network a
+ * "fragmented" page array that it can't transfer in 1 RDMA */
+ if (page_count != 0 && oap->oap_page_off != 0)
+ break;
+
/* in llite being 'ready' equates to the page being locked
* until completion unlocks it. commit_write submits a page
* as not ready because its unlock will happen unconditionally
* as the call returns. if we race with commit_write giving
- * us that page we dont' want to create a hole in the page
+ * us that page we don't want to create a hole in the page
* stream, so we stop and leave the rpc to be fired by
* another dirtier or kupdated interval (the not ready page
* will still be on the dirty list). we could call in
case -EINTR:
/* the io isn't needed.. tell the checks
* below to complete the rpc with EINTR */
+ cfs_spin_lock(&oap->oap_lock);
oap->oap_async_flags |= ASYNC_COUNT_STABLE;
+ cfs_spin_unlock(&oap->oap_lock);
oap->oap_count = -EINTR;
break;
case 0:
+ cfs_spin_lock(&oap->oap_lock);
oap->oap_async_flags |= ASYNC_READY;
+ cfs_spin_unlock(&oap->oap_lock);
break;
default:
LASSERTF(0, "oap %p page %p returned %d "
}
}
#endif
- /* If there is a gap at the start of this page, it can't merge
- * with any previous page, so we'll hand the network a
- * "fragmented" page array that it can't transfer in 1 RDMA */
- if (page_count != 0 && oap->oap_page_off != 0)
- break;
/* take the page out of our book-keeping */
- list_del_init(&oap->oap_pending_item);
+ cfs_list_del_init(&oap->oap_pending_item);
lop_update_pending(cli, lop, cmd, -1);
- list_del_init(&oap->oap_urgent_item);
+ cfs_list_del_init(&oap->oap_urgent_item);
if (page_count == 0)
starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
}
/* now put the page back in our accounting */
- list_add_tail(&oap->oap_rpc_item, &rpc_list);
+ cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
+ if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
+ mem_tight = 1;
if (page_count == 0)
srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
if (++page_count >= cli->cl_max_pages_per_rpc)
}
osc_wake_cache_waiters(cli);
-
+ osc_wake_sync_fs(cli);
loi_list_maint(cli, loi);
client_obd_list_unlock(&cli->cl_loi_list_lock);
RETURN(0);
}
- req = osc_build_req(env, cli, &rpc_list, page_count, cmd);
+ req = osc_build_req(env, cli, &rpc_list, page_count,
+ mem_tight ? (cmd | OBD_BRW_MEMALLOC) : cmd);
if (IS_ERR(req)) {
- LASSERT(list_empty(&rpc_list));
+ LASSERT(cfs_list_empty(&rpc_list));
loi_list_maint(cli, loi);
RETURN(PTR_ERR(req));
}
/* queued sync pages can be torn down while the pages
* were between the pending list and the rpc */
tmp = NULL;
- list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
+ cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
/* only one oap gets a request reference */
if (tmp == NULL)
tmp = oap;
#define LOI_DEBUG(LOI, STR, args...) \
CDEBUG(D_INODE, "loi ready %d wr %d:%d rd %d:%d " STR, \
- !list_empty(&(LOI)->loi_ready_item) || \
- !list_empty(&(LOI)->loi_hp_ready_item), \
+ !cfs_list_empty(&(LOI)->loi_ready_item) || \
+ !cfs_list_empty(&(LOI)->loi_hp_ready_item), \
(LOI)->loi_write_lop.lop_num_pending, \
- !list_empty(&(LOI)->loi_write_lop.lop_urgent), \
+ !cfs_list_empty(&(LOI)->loi_write_lop.lop_urgent), \
(LOI)->loi_read_lop.lop_num_pending, \
- !list_empty(&(LOI)->loi_read_lop.lop_urgent), \
+ !cfs_list_empty(&(LOI)->loi_read_lop.lop_urgent), \
args) \
/* This is called by osc_check_rpcs() to find which objects have pages that
/* First return objects that have blocked locks so that they
* will be flushed quickly and other clients can get the lock,
* then objects which have pages ready to be stuffed into RPCs */
- if (!list_empty(&cli->cl_loi_hp_ready_list))
- RETURN(list_entry(cli->cl_loi_hp_ready_list.next,
- struct lov_oinfo, loi_hp_ready_item));
- if (!list_empty(&cli->cl_loi_ready_list))
- RETURN(list_entry(cli->cl_loi_ready_list.next,
- struct lov_oinfo, loi_ready_item));
+ if (!cfs_list_empty(&cli->cl_loi_hp_ready_list))
+ RETURN(cfs_list_entry(cli->cl_loi_hp_ready_list.next,
+ struct lov_oinfo, loi_hp_ready_item));
+ if (!cfs_list_empty(&cli->cl_loi_ready_list))
+ RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
+ struct lov_oinfo, loi_ready_item));
+ if (!cfs_list_empty(&cli->cl_loi_sync_fs_list))
+ RETURN(cfs_list_entry(cli->cl_loi_sync_fs_list.next,
+ struct lov_oinfo, loi_sync_fs_item));
/* then if we have cache waiters, return all objects with queued
* writes. This is especially important when many small files
* have filled up the cache and not been fired into rpcs because
* they don't pass the nr_pending/object threshhold */
- if (!list_empty(&cli->cl_cache_waiters) &&
- !list_empty(&cli->cl_loi_write_list))
- RETURN(list_entry(cli->cl_loi_write_list.next,
- struct lov_oinfo, loi_write_item));
+ if (!cfs_list_empty(&cli->cl_cache_waiters) &&
+ !cfs_list_empty(&cli->cl_loi_write_list))
+ RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
+ struct lov_oinfo, loi_write_item));
/* then return all queued objects when we have an invalid import
* so that they get flushed */
if (cli->cl_import == NULL || cli->cl_import->imp_invalid) {
- if (!list_empty(&cli->cl_loi_write_list))
- RETURN(list_entry(cli->cl_loi_write_list.next,
- struct lov_oinfo, loi_write_item));
- if (!list_empty(&cli->cl_loi_read_list))
- RETURN(list_entry(cli->cl_loi_read_list.next,
- struct lov_oinfo, loi_read_item));
+ if (!cfs_list_empty(&cli->cl_loi_write_list))
+ RETURN(cfs_list_entry(cli->cl_loi_write_list.next,
+ struct lov_oinfo,
+ loi_write_item));
+ if (!cfs_list_empty(&cli->cl_loi_read_list))
+ RETURN(cfs_list_entry(cli->cl_loi_read_list.next,
+ struct lov_oinfo, loi_read_item));
}
RETURN(NULL);
}
struct osc_async_page *oap;
int hprpc = 0;
- if (!list_empty(&loi->loi_write_lop.lop_urgent)) {
- oap = list_entry(loi->loi_write_lop.lop_urgent.next,
- struct osc_async_page, oap_urgent_item);
+ if (!cfs_list_empty(&loi->loi_write_lop.lop_urgent)) {
+ oap = cfs_list_entry(loi->loi_write_lop.lop_urgent.next,
+ struct osc_async_page, oap_urgent_item);
hprpc = !!(oap->oap_async_flags & ASYNC_HP);
}
- if (!hprpc && !list_empty(&loi->loi_read_lop.lop_urgent)) {
- oap = list_entry(loi->loi_write_lop.lop_urgent.next,
- struct osc_async_page, oap_urgent_item);
+ if (!hprpc && !cfs_list_empty(&loi->loi_read_lop.lop_urgent)) {
+ oap = cfs_list_entry(loi->loi_read_lop.lop_urgent.next,
+ struct osc_async_page, oap_urgent_item);
hprpc = !!(oap->oap_async_flags & ASYNC_HP);
}
if (lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)) {
rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_WRITE,
&loi->loi_write_lop);
- if (rc < 0)
- break;
+ if (rc < 0) {
+ CERROR("Write request failed with %d\n", rc);
+
+ /* osc_send_oap_rpc failed, mostly because of
+ * memory pressure.
+ *
+ * It can't break here, because if:
+ * - a page was submitted by osc_io_submit, so
+ * page locked;
+ * - no request in flight
+ * - no subsequent request
+ * The system will be in live-lock state,
+ * because there is no chance to call
+ * osc_io_unplug() and osc_check_rpcs() any
+ * more. pdflush can't help in this case,
+ * because it might be blocked at grabbing
+ * the page lock as we mentioned.
+ *
+ * Anyway, continue to drain pages. */
+ /* break; */
+ }
+
if (rc > 0)
race_counter = 0;
else
rc = osc_send_oap_rpc(env, cli, loi, OBD_BRW_READ,
&loi->loi_read_lop);
if (rc < 0)
- break;
+ CERROR("Read request failed with %d\n", rc);
+
if (rc > 0)
race_counter = 0;
else
race_counter++;
}
- /* attempt some inter-object balancing by issueing rpcs
+ /* attempt some inter-object balancing by issuing rpcs
* for each object in turn */
- if (!list_empty(&loi->loi_hp_ready_item))
- list_del_init(&loi->loi_hp_ready_item);
- if (!list_empty(&loi->loi_ready_item))
- list_del_init(&loi->loi_ready_item);
- if (!list_empty(&loi->loi_write_item))
- list_del_init(&loi->loi_write_item);
- if (!list_empty(&loi->loi_read_item))
- list_del_init(&loi->loi_read_item);
+ if (!cfs_list_empty(&loi->loi_hp_ready_item))
+ cfs_list_del_init(&loi->loi_hp_ready_item);
+ if (!cfs_list_empty(&loi->loi_ready_item))
+ cfs_list_del_init(&loi->loi_ready_item);
+ if (!cfs_list_empty(&loi->loi_write_item))
+ cfs_list_del_init(&loi->loi_write_item);
+ if (!cfs_list_empty(&loi->loi_read_item))
+ cfs_list_del_init(&loi->loi_read_item);
+ if (!cfs_list_empty(&loi->loi_sync_fs_item))
+ cfs_list_del_init(&loi->loi_sync_fs_item);
loi_list_maint(cli, loi);
int rc;
ENTRY;
client_obd_list_lock(&cli->cl_loi_list_lock);
- rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
+ rc = cfs_list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
client_obd_list_unlock(&cli->cl_loi_list_lock);
RETURN(rc);
};
osc_consume_write_grant(cli, &oap->oap_brw_page);
if (transient) {
cli->cl_dirty_transit += CFS_PAGE_SIZE;
- atomic_inc(&obd_dirty_transit_pages);
+ cfs_atomic_inc(&obd_dirty_transit_pages);
oap->oap_brw_flags |= OBD_BRW_NOCACHE;
}
}
ENTRY;
CDEBUG(D_CACHE, "dirty: %ld/%d dirty_max: %ld/%d dropped: %lu "
- "grant: %lu\n", cli->cl_dirty, atomic_read(&obd_dirty_pages),
+ "grant: %lu\n", cli->cl_dirty, cfs_atomic_read(&obd_dirty_pages),
cli->cl_dirty_max, obd_max_dirty_pages,
cli->cl_lost_grant, cli->cl_avail_grant);
/* Hopefully normal case - cache space and write credits available */
if (cli->cl_dirty + CFS_PAGE_SIZE <= cli->cl_dirty_max &&
- atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
+ cfs_atomic_read(&obd_dirty_pages) + 1 <= obd_max_dirty_pages &&
osc_enter_cache_try(env, cli, loi, oap, 0))
RETURN(0);
- /* Make sure that there are write rpcs in flight to wait for. This
- * is a little silly as this object may not have any pending but
- * other objects sure might. */
- if (cli->cl_w_in_flight) {
- list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
+ /* It is safe to block as a cache waiter as long as there is grant
+ * space available or the hope of additional grant being returned
+ * when an in flight write completes. Using the write back cache
+ * if possible is preferable to sending the data synchronously
+ * because write pages can then be merged in to large requests.
+ * The addition of this cache waiter will causing pending write
+ * pages to be sent immediately. */
+ if (cli->cl_w_in_flight || cli->cl_avail_grant >= CFS_PAGE_SIZE) {
+ cfs_list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
cfs_waitq_init(&ocw.ocw_waitq);
ocw.ocw_oap = oap;
ocw.ocw_rc = 0;
l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
client_obd_list_lock(&cli->cl_loi_list_lock);
- if (!list_empty(&ocw.ocw_entry)) {
- list_del(&ocw.ocw_entry);
+ if (!cfs_list_empty(&ocw.ocw_entry)) {
+ cfs_list_del(&ocw.ocw_entry);
RETURN(-EINTR);
}
RETURN(ocw.ocw_rc);
ENTRY;
if (!page)
- return size_round(sizeof(*oap));
+ return cfs_size_round(sizeof(*oap));
oap = *res;
oap->oap_magic = OAP_MAGIC;
CFS_INIT_LIST_HEAD(&oap->oap_rpc_item);
CFS_INIT_LIST_HEAD(&oap->oap_page_list);
- spin_lock_init(&oap->oap_lock);
+ cfs_spin_lock_init(&oap->oap_lock);
CDEBUG(D_CACHE, "oap %p page %p obj off "LPU64"\n", oap, page, offset);
RETURN(0);
}
if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
RETURN(-EIO);
- if (!list_empty(&oap->oap_pending_item) ||
- !list_empty(&oap->oap_urgent_item) ||
- !list_empty(&oap->oap_rpc_item))
+ if (!cfs_list_empty(&oap->oap_pending_item) ||
+ !cfs_list_empty(&oap->oap_urgent_item) ||
+ !cfs_list_empty(&oap->oap_rpc_item))
RETURN(-EBUSY);
/* check if the file's owner/group is over quota */
if ((cmd & OBD_BRW_WRITE) && !(cmd & OBD_BRW_NOQUOTA)) {
struct cl_object *obj;
struct cl_attr attr; /* XXX put attr into thread info */
+ unsigned int qid[MAXQUOTAS];
obj = cl_object_top(osc_oap2cl_page(oap)->cp_obj);
rc = cl_object_attr_get(env, obj, &attr);
cl_object_attr_unlock(obj);
- if (rc == 0 && lquota_chkdq(quota_interface, cli, attr.cat_uid,
- attr.cat_gid) == NO_QUOTA)
+ qid[USRQUOTA] = attr.cat_uid;
+ qid[GRPQUOTA] = attr.cat_gid;
+ if (rc == 0 &&
+ lquota_chkdq(quota_interface, cli, qid) == NO_QUOTA)
rc = -EDQUOT;
if (rc)
RETURN(rc);
oap->oap_page_off = off;
oap->oap_count = count;
oap->oap_brw_flags = brw_flags;
+ /* Give a hint to OST that requests are coming from kswapd - bug19529 */
+ if (cfs_memory_pressure_get())
+ oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
+ cfs_spin_lock(&oap->oap_lock);
oap->oap_async_flags = async_flags;
+ cfs_spin_unlock(&oap->oap_lock);
if (cmd & OBD_BRW_WRITE) {
rc = osc_enter_cache(env, cli, loi, oap);
obd_flag async_flags)
{
struct loi_oap_pages *lop;
+ int flags = 0;
ENTRY;
- if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
- RETURN(-EIO);
+ LASSERT(!cfs_list_empty(&oap->oap_pending_item));
if (oap->oap_cmd & OBD_BRW_WRITE) {
lop = &loi->loi_write_lop;
lop = &loi->loi_read_lop;
}
- if (list_empty(&oap->oap_pending_item))
- RETURN(-EINVAL);
-
if ((oap->oap_async_flags & async_flags) == async_flags)
RETURN(0);
+ /* XXX: This introduces a tiny insignificant race for the case if this
+ * loi already had other urgent items.
+ */
+ if (SETTING(oap->oap_async_flags, async_flags, ASYNC_SYNCFS) &&
+ cfs_list_empty(&oap->oap_rpc_item) &&
+ cfs_list_empty(&oap->oap_urgent_item)) {
+ cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
+ flags |= ASYNC_SYNCFS;
+ cfs_spin_lock(&oap->oap_lock);
+ oap->oap_async_flags |= flags;
+ cfs_spin_unlock(&oap->oap_lock);
+ loi_list_maint(cli, loi);
+ RETURN(0);
+ }
+
if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
- oap->oap_async_flags |= ASYNC_READY;
+ flags |= ASYNC_READY;
if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
- list_empty(&oap->oap_rpc_item)) {
+ cfs_list_empty(&oap->oap_rpc_item)) {
if (oap->oap_async_flags & ASYNC_HP)
- list_add(&oap->oap_urgent_item, &lop->lop_urgent);
+ cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
else
- list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
- oap->oap_async_flags |= ASYNC_URGENT;
+ cfs_list_add_tail(&oap->oap_urgent_item,
+ &lop->lop_urgent);
+ flags |= ASYNC_URGENT;
loi_list_maint(cli, loi);
}
+ cfs_spin_lock(&oap->oap_lock);
+ oap->oap_async_flags |= flags;
+ cfs_spin_unlock(&oap->oap_lock);
LOI_DEBUG(loi, "oap %p page %p has flags %x\n", oap, oap->oap_page,
oap->oap_async_flags);
client_obd_list_lock(&cli->cl_loi_list_lock);
- if (!list_empty(&oap->oap_rpc_item))
+ if (!cfs_list_empty(&oap->oap_rpc_item))
GOTO(out, rc = -EBUSY);
osc_exit_cache(cli, oap, 0);
osc_wake_cache_waiters(cli);
- if (!list_empty(&oap->oap_urgent_item)) {
- list_del_init(&oap->oap_urgent_item);
- oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
+ if (!cfs_list_empty(&oap->oap_urgent_item)) {
+ cfs_list_del_init(&oap->oap_urgent_item);
+ cfs_spin_lock(&oap->oap_lock);
+ oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP |
+ ASYNC_SYNCFS);
+ cfs_spin_unlock(&oap->oap_lock);
}
- if (!list_empty(&oap->oap_pending_item)) {
- list_del_init(&oap->oap_pending_item);
+ if (!cfs_list_empty(&oap->oap_pending_item)) {
+ cfs_list_del_init(&oap->oap_pending_item);
lop_update_pending(cli, lop, oap->oap_cmd, -1);
}
loi_list_maint(cli, loi);
LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
lock_res_and_lock(lock);
- spin_lock(&osc_ast_guard);
+ cfs_spin_lock(&osc_ast_guard);
LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
lock->l_ast_data = data;
- spin_unlock(&osc_ast_guard);
+ cfs_spin_unlock(&osc_ast_guard);
unlock_res_and_lock(lock);
}
struct ldlm_res_id res_id;
struct obd_device *obd = class_exp2obd(exp);
- osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_gr, &res_id);
+ osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
return 0;
}
+/* find any ldlm lock of the inode in osc
+ * return 0 not find
+ * 1 find one
+ * < 0 error */
+static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
+ ldlm_iterator_t replace, void *data)
+{
+ struct ldlm_res_id res_id;
+ struct obd_device *obd = class_exp2obd(exp);
+ int rc = 0;
+
+ osc_build_res_name(lsm->lsm_object_id, lsm->lsm_object_seq, &res_id);
+ rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
+ if (rc == LDLM_ITER_STOP)
+ return(1);
+ if (rc == LDLM_ITER_CONTINUE)
+ return(0);
+ return(rc);
+}
+
static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
obd_enqueue_update_f upcall, void *cookie,
int *flags, int rc)
* osc_enqueue_fini(). */
ldlm_lock_addref(&handle, mode);
+ /* Let CP AST to grant the lock first. */
+ OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
+
/* Complete obtaining the lock procedure. */
rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
mode, aa->oa_flags, aa->oa_lvb,
- sizeof(*aa->oa_lvb), lustre_swab_ost_lvb,
- &handle, rc);
+ sizeof(*aa->oa_lvb), &handle, rc);
/* Complete osc stuff. */
rc = osc_enqueue_fini(req, aa->oa_lvb,
aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc);
RETURN(-ENOMEM);
rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
- if (rc)
+ if (rc) {
+ ptlrpc_request_free(req);
RETURN(rc);
+ }
req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
sizeof *lvb);
*flags &= ~LDLM_FL_BLOCK_GRANTED;
rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
- sizeof(*lvb), lustre_swab_ost_lvb, lockh, async);
+ sizeof(*lvb), lockh, async);
if (rqset) {
if (!rc) {
struct osc_enqueue_args *aa;
ENTRY;
osc_build_res_name(oinfo->oi_md->lsm_object_id,
- oinfo->oi_md->lsm_object_gr, &res_id);
+ oinfo->oi_md->lsm_object_seq, &res_id);
rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
&oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
rc = mode;
if (mode == LCK_PR)
rc |= LCK_PW;
- rc = ldlm_lock_match(obd->obd_namespace, lflags | LDLM_FL_LVB_READY,
+ rc = ldlm_lock_match(obd->obd_namespace, lflags,
res_id, type, policy, rc, lockh, unref);
if (rc) {
if (data != NULL)
}
static int osc_cancel_unused(struct obd_export *exp,
- struct lov_stripe_md *lsm, int flags,
+ struct lov_stripe_md *lsm,
+ ldlm_cancel_flags_t flags,
void *opaque)
{
struct obd_device *obd = class_exp2obd(exp);
if (lsm != NULL) {
resp = osc_build_res_name(lsm->lsm_object_id,
- lsm->lsm_object_gr, &res_id);
+ lsm->lsm_object_seq, &res_id);
}
return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
struct ptlrpc_request *req,
struct osc_async_args *aa, int rc)
{
+ struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
struct obd_statfs *msfs;
+ __u64 used;
ENTRY;
+ if (rc == -EBADR)
+ /* The request has in fact never been sent
+ * due to issues at a higher level (LOV).
+ * Exit immediately since the caller is
+ * aware of the problem and takes care
+ * of the clean up */
+ RETURN(rc);
+
if ((rc == -ENOTCONN || rc == -EAGAIN) &&
(aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
GOTO(out, rc = 0);
GOTO(out, rc = -EPROTO);
}
+ /* Reinitialize the RDONLY and DEGRADED flags at the client
+ * on each statfs, so they don't stay set permanently. */
+ cfs_spin_lock(&cli->cl_oscc.oscc_lock);
+
+ if (unlikely(msfs->os_state & OS_STATE_DEGRADED))
+ cli->cl_oscc.oscc_flags |= OSCC_FLAG_DEGRADED;
+ else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_DEGRADED))
+ cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_DEGRADED;
+
+ if (unlikely(msfs->os_state & OS_STATE_READONLY))
+ cli->cl_oscc.oscc_flags |= OSCC_FLAG_RDONLY;
+ else if (unlikely(cli->cl_oscc.oscc_flags & OSCC_FLAG_RDONLY))
+ cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_RDONLY;
+
+ /* Add a bit of hysteresis so this flag isn't continually flapping,
+ * and ensure that new files don't get extremely fragmented due to
+ * only a small amount of available space in the filesystem.
+ * We want to set the NOSPC flag when there is less than ~0.1% free
+ * and clear it when there is at least ~0.2% free space, so:
+ * avail < ~0.1% max max = avail + used
+ * 1025 * avail < avail + used used = blocks - free
+ * 1024 * avail < used
+ * 1024 * avail < blocks - free
+ * avail < ((blocks - free) >> 10)
+ *
+ * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
+ * lose that amount of space so in those cases we report no space left
+ * if their is less than 1 GB left. */
+ used = min_t(__u64,(msfs->os_blocks - msfs->os_bfree) >> 10, 1 << 30);
+ if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) == 0) &&
+ ((msfs->os_ffree < 32) || (msfs->os_bavail < used))))
+ cli->cl_oscc.oscc_flags |= OSCC_FLAG_NOSPC;
+ else if (unlikely(((cli->cl_oscc.oscc_flags & OSCC_FLAG_NOSPC) != 0) &&
+ (msfs->os_ffree > 64) && (msfs->os_bavail > (used << 1))))
+ cli->cl_oscc.oscc_flags &= ~OSCC_FLAG_NOSPC;
+
+ cfs_spin_unlock(&cli->cl_oscc.oscc_lock);
+
*aa->aa_oi->oi_osfs = *msfs;
out:
rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
/*Since the request might also come from lprocfs, so we need
*sync this with client_disconnect_export Bug15684*/
- down_read(&obd->u.cli.cl_sem);
+ cfs_down_read(&obd->u.cli.cl_sem);
if (obd->u.cli.cl_import)
imp = class_import_get(obd->u.cli.cl_import);
- up_read(&obd->u.cli.cl_sem);
+ cfs_up_read(&obd->u.cli.cl_sem);
if (!imp)
RETURN(-ENODEV);
/* we only need the header part from user space to get lmm_magic and
* lmm_stripe_count, (the header part is common to v1 and v3) */
lum_size = sizeof(struct lov_user_md_v1);
- if (copy_from_user(&lum, lump, lum_size))
+ if (cfs_copy_from_user(&lum, lump, lum_size))
RETURN(-EFAULT);
if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
}
lumk->lmm_object_id = lsm->lsm_object_id;
- lumk->lmm_object_gr = lsm->lsm_object_gr;
+ lumk->lmm_object_seq = lsm->lsm_object_seq;
lumk->lmm_stripe_count = 1;
- if (copy_to_user(lump, lumk, lum_size))
+ if (cfs_copy_to_user(lump, lumk, lum_size))
rc = -EFAULT;
if (lumk != &lum)
int err = 0;
ENTRY;
- if (!try_module_get(THIS_MODULE)) {
+ if (!cfs_try_module_get(THIS_MODULE)) {
CERROR("Can't get module. Is it alive?");
return -EINVAL;
}
memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
- err = copy_to_user((void *)uarg, buf, len);
+ err = cfs_copy_to_user((void *)uarg, buf, len);
if (err)
err = -EFAULT;
obd_ioctl_freedata(buf, len);
GOTO(out, err = -ENOTTY);
}
out:
- module_put(THIS_MODULE);
+ cfs_module_put(THIS_MODULE);
return err;
}
/* XXX return an error? skip setting below flags? */
}
- spin_lock(&imp->imp_lock);
+ cfs_spin_lock(&imp->imp_lock);
imp->imp_server_timeout = 1;
imp->imp_pingable = 1;
- spin_unlock(&imp->imp_lock);
+ cfs_spin_unlock(&imp->imp_lock);
CDEBUG(D_RPCTRACE, "pinging OST %s\n", obd2cli_tgt(imp->imp_obd));
RETURN(rc);
OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
if (KEY_IS(KEY_NEXT_ID)) {
+ obd_id new_val;
+ struct osc_creator *oscc = &obd->u.cli.cl_oscc;
+
if (vallen != sizeof(obd_id))
RETURN(-ERANGE);
if (val == NULL)
RETURN(-EINVAL);
- obd->u.cli.cl_oscc.oscc_next_id = *((obd_id*)val) + 1;
+
+ if (vallen != sizeof(obd_id))
+ RETURN(-EINVAL);
+
+ /* avoid race between allocate new object and set next id
+ * from ll_sync thread */
+ cfs_spin_lock(&oscc->oscc_lock);
+ new_val = *((obd_id*)val) + 1;
+ if (new_val > oscc->oscc_next_id)
+ oscc->oscc_next_id = new_val;
+ cfs_spin_unlock(&oscc->oscc_lock);
CDEBUG(D_HA, "%s: set oscc_next_id = "LPU64"\n",
exp->exp_obd->obd_name,
obd->u.cli.cl_oscc.oscc_next_id);
RETURN(0);
}
- if (KEY_IS(KEY_UNLINKED)) {
- struct osc_creator *oscc = &obd->u.cli.cl_oscc;
- spin_lock(&oscc->oscc_lock);
- oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
- spin_unlock(&oscc->oscc_lock);
- RETURN(0);
- }
-
- if (KEY_IS(KEY_INIT_RECOV)) {
- if (vallen != sizeof(int))
- RETURN(-EINVAL);
- spin_lock(&imp->imp_lock);
- imp->imp_initial_recov = *(int *)val;
- spin_unlock(&imp->imp_lock);
- CDEBUG(D_HA, "%s: set imp_initial_recov = %d\n",
- exp->exp_obd->obd_name,
- imp->imp_initial_recov);
- RETURN(0);
- }
-
if (KEY_IS(KEY_CHECKSUM)) {
if (vallen != sizeof(int))
RETURN(-EINVAL);
if (!set && !KEY_IS(KEY_GRANT_SHRINK))
RETURN(-EINVAL);
- /* If OST understood OBD_CONNECT_MDS we don't need to tell it we
- * are the MDS again. Just do the local setup. b=16839 */
- if (KEY_IS(KEY_MDS_CONN) &&
- (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MDS))
- RETURN(osc_setinfo_mds_connect_import(imp));
-
/* We pass all other commands directly to OST. Since nobody calls osc
methods directly and everybody is supposed to go through LOV, we
assume lov checked invalid values for us.
Even if something bad goes through, we'd get a -EINVAL from OST
anyway. */
- if (KEY_IS(KEY_GRANT_SHRINK))
- req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
- else
- req = ptlrpc_request_alloc(imp, &RQF_OST_SET_INFO);
-
- if (req == NULL)
+ if (KEY_IS(KEY_GRANT_SHRINK))
+ req = ptlrpc_request_alloc(imp, &RQF_OST_SET_GRANT_INFO);
+ else
+ req = ptlrpc_request_alloc(imp, &RQF_OBD_SET_INFO);
+
+ if (req == NULL)
RETURN(-ENOMEM);
req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
if (KEY_IS(KEY_MDS_CONN)) {
struct osc_creator *oscc = &obd->u.cli.cl_oscc;
- oscc->oscc_oa.o_gr = (*(__u32 *)val);
+ oscc->oscc_oa.o_seq = (*(__u32 *)val);
oscc->oscc_oa.o_valid |= OBD_MD_FLGROUP;
- LASSERT_MDS_GROUP(oscc->oscc_oa.o_gr);
+ LASSERT_SEQ_IS_MDT(oscc->oscc_oa.o_seq);
req->rq_no_delay = req->rq_no_resend = 1;
req->rq_interpret_reply = osc_setinfo_mds_conn_interpret;
} else if (KEY_IS(KEY_GRANT_SHRINK)) {
CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
aa = ptlrpc_req_async_args(req);
- OBD_ALLOC_PTR(oa);
+ OBDO_ALLOC(oa);
if (!oa) {
ptlrpc_req_finished(req);
RETURN(-ENOMEM);
}
*oa = ((struct ost_body *)val)->oa;
aa->aa_oa = oa;
- req->rq_interpret_reply = osc_shrink_grant_interpret;
- }
-
- ptlrpc_request_set_replen(req);
- if (!KEY_IS(KEY_GRANT_SHRINK)) {
- LASSERT(set != NULL);
- ptlrpc_set_add_req(set, req);
- ptlrpc_check_set(NULL, set);
- } else
- ptlrpcd_add_req(req, PSCOPE_OTHER);
-
- RETURN(0);
+ req->rq_interpret_reply = osc_shrink_grant_interpret;
+ }
+
+ ptlrpc_request_set_replen(req);
+ if (!KEY_IS(KEY_GRANT_SHRINK)) {
+ LASSERT(set != NULL);
+ ptlrpc_set_add_req(set, req);
+ ptlrpc_check_set(NULL, set);
+ } else
+ ptlrpcd_add_req(req, PSCOPE_OTHER);
+
+ RETURN(0);
}
};
static struct llog_operations osc_mds_ost_orig_logops;
-static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
- struct obd_device *tgt, int count,
- struct llog_catid *catid, struct obd_uuid *uuid)
+
+static int __osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
+ struct obd_device *tgt, struct llog_catid *catid)
{
int rc;
ENTRY;
- LASSERT(olg == &obd->obd_olg);
- spin_lock(&obd->obd_dev_lock);
- if (osc_mds_ost_orig_logops.lop_setup != llog_obd_origin_setup) {
- osc_mds_ost_orig_logops = llog_lvfs_ops;
- osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
- osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
- osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
- osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
- }
- spin_unlock(&obd->obd_dev_lock);
-
- rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, count,
+ rc = llog_setup(obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, tgt, 1,
&catid->lci_logid, &osc_mds_ost_orig_logops);
if (rc) {
CERROR("failed LLOG_MDS_OST_ORIG_CTXT\n");
GOTO(out, rc);
}
- rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, count,
+ rc = llog_setup(obd, &obd->obd_olg, LLOG_SIZE_REPL_CTXT, tgt, 1,
NULL, &osc_size_repl_logops);
if (rc) {
struct llog_ctxt *ctxt =
GOTO(out, rc);
out:
if (rc) {
- CERROR("osc '%s' tgt '%s' cnt %d catid %p rc=%d\n",
- obd->obd_name, tgt->obd_name, count, catid, rc);
+ CERROR("osc '%s' tgt '%s' catid %p rc=%d\n",
+ obd->obd_name, tgt->obd_name, catid, rc);
CERROR("logid "LPX64":0x%x\n",
catid->lci_logid.lgl_oid, catid->lci_logid.lgl_ogen);
}
return rc;
}
+static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
+ struct obd_device *disk_obd, int *index)
+{
+ struct llog_catid catid;
+ static char name[32] = CATLIST;
+ int rc;
+ ENTRY;
+
+ LASSERT(olg == &obd->obd_olg);
+
+ cfs_mutex_down(&olg->olg_cat_processing);
+ rc = llog_get_cat_list(disk_obd, name, *index, 1, &catid);
+ if (rc) {
+ CERROR("rc: %d\n", rc);
+ GOTO(out, rc);
+ }
+
+ CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
+ obd->obd_name, *index, catid.lci_logid.lgl_oid,
+ catid.lci_logid.lgl_oseq, catid.lci_logid.lgl_ogen);
+
+ rc = __osc_llog_init(obd, olg, disk_obd, &catid);
+ if (rc) {
+ CERROR("rc: %d\n", rc);
+ GOTO(out, rc);
+ }
+
+ rc = llog_put_cat_list(disk_obd, name, *index, 1, &catid);
+ if (rc) {
+ CERROR("rc: %d\n", rc);
+ GOTO(out, rc);
+ }
+
+ out:
+ cfs_mutex_up(&olg->olg_cat_processing);
+
+ return rc;
+}
+
static int osc_llog_finish(struct obd_device *obd, int count)
{
struct llog_ctxt *ctxt;
long lost_grant;
client_obd_list_lock(&cli->cl_loi_list_lock);
- data->ocd_grant = cli->cl_avail_grant ?:
+ data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
lost_grant = cli->cl_lost_grant;
cli->cl_lost_grant = 0;
client_obd_list_unlock(&cli->cl_loi_list_lock);
CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
- "cl_lost_grant: %ld\n", data->ocd_grant,
- cli->cl_avail_grant, lost_grant);
+ "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
+ cli->cl_avail_grant, cli->cl_dirty, lost_grant);
CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
" ocd_grant: %d\n", data->ocd_connect_flags,
data->ocd_version, data->ocd_grant);
* causes the following problem if setup (connect) and cleanup
* (disconnect) are tangled together.
* connect p1 disconnect p2
- * ptlrpc_connect_import
+ * ptlrpc_connect_import
* ............... class_manual_cleanup
* osc_disconnect
* del_shrink_grant
* ptlrpc_connect_interrupt
* init_grant_shrink
- * add this client to shrink list
+ * add this client to shrink list
* cleanup_osc
* Bang! pinger trigger the shrink.
* So the osc should be disconnected from the shrink list, after we
- * are sure the import has been destroyed. BUG18662
+ * are sure the import has been destroyed. BUG18662
*/
if (obd->u.cli.cl_import == NULL)
osc_del_shrink_grant(&obd->u.cli);
if (imp->imp_server_timeout) {
struct osc_creator *oscc = &obd->u.cli.cl_oscc;
- spin_lock(&oscc->oscc_lock);
+ cfs_spin_lock(&oscc->oscc_lock);
oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
- spin_unlock(&oscc->oscc_lock);
+ cfs_spin_unlock(&oscc->oscc_lock);
}
cli = &obd->u.cli;
client_obd_list_lock(&cli->cl_loi_list_lock);
if (imp->imp_server_timeout) {
struct osc_creator *oscc = &obd->u.cli.cl_oscc;
- spin_lock(&oscc->oscc_lock);
+ cfs_spin_lock(&oscc->oscc_lock);
oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
- spin_unlock(&oscc->oscc_lock);
+ cfs_spin_unlock(&oscc->oscc_lock);
}
rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
break;
RETURN(rc);
}
+/**
+ * Determine whether the lock can be canceled before replaying the lock
+ * during recovery, see bug16774 for detailed information.
+ *
+ * \retval zero the lock can't be canceled
+ * \retval other ok to cancel
+ */
+static int osc_cancel_for_recovery(struct ldlm_lock *lock)
+{
+ check_res_locked(lock->l_resource);
+
+ /*
+ * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
+ *
+ * XXX as a future improvement, we can also cancel unused write lock
+ * if it doesn't have dirty data and active mmaps.
+ */
+ if (lock->l_resource->lr_type == LDLM_EXTENT &&
+ (lock->l_granted_mode == LCK_PR ||
+ lock->l_granted_mode == LCK_CR) &&
+ (osc_dlm_lock_pageref(lock) == 0))
+ RETURN(1);
+
+ RETURN(0);
+}
+
int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
{
int rc;
struct lprocfs_static_vars lvars = { 0 };
struct client_obd *cli = &obd->u.cli;
+ cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
lprocfs_osc_init_vars(&lvars);
if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
lproc_osc_attach_seqstat(obd);
ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
OST_MAXREQSIZE,
ptlrpc_add_rqs_to_pool);
-
+
CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
- sema_init(&cli->cl_grant_sem, 1);
+ cfs_sema_init(&cli->cl_grant_sem, 1);
+
+ ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
}
RETURN(rc);
CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
/* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
ptlrpc_deactivate_import(imp);
- spin_lock(&imp->imp_lock);
+ cfs_spin_lock(&imp->imp_lock);
imp->imp_pingable = 0;
- spin_unlock(&imp->imp_lock);
+ cfs_spin_unlock(&imp->imp_lock);
break;
}
case OBD_CLEANUP_EXPORTS: {
client import will not have been cleaned. */
if (obd->u.cli.cl_import) {
struct obd_import *imp;
- down_write(&obd->u.cli.cl_sem);
+ cfs_down_write(&obd->u.cli.cl_sem);
imp = obd->u.cli.cl_import;
CDEBUG(D_CONFIG, "%s: client import never connected\n",
obd->obd_name);
imp->imp_rq_pool = NULL;
}
class_destroy_import(imp);
- up_write(&obd->u.cli.cl_sem);
+ cfs_up_write(&obd->u.cli.cl_sem);
obd->u.cli.cl_import = NULL;
}
rc = obd_llog_finish(obd, 0);
if (rc != 0)
CERROR("failed to cleanup llogging subsystems\n");
break;
- }
+ }
}
RETURN(rc);
}
int osc_cleanup(struct obd_device *obd)
{
- struct osc_creator *oscc = &obd->u.cli.cl_oscc;
int rc;
ENTRY;
ptlrpc_lprocfs_unregister_obd(obd);
lprocfs_obd_cleanup(obd);
- spin_lock(&oscc->oscc_lock);
- oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
- oscc->oscc_flags |= OSCC_FLAG_EXITING;
- spin_unlock(&oscc->oscc_lock);
-
/* free memory of osc quota cache */
lquota_cleanup(quota_interface, obd);
default:
rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
lcfg, obd);
- if (rc > 0)
- rc = 0;
+ if (rc > 0)
+ rc = 0;
break;
}
return(rc);
}
+static int osc_sync_fs(struct obd_device *obd, struct obd_info *oinfo,
+ int wait)
+{
+ struct client_obd *cli;
+ struct lov_oinfo *loi;
+ struct lov_oinfo *tloi;
+ struct osc_async_page *oap;
+ struct osc_async_page *toap;
+ struct loi_oap_pages *lop;
+ struct lu_env *env;
+ int refcheck;
+ int rc = 0;
+ ENTRY;
+
+ env = cl_env_get(&refcheck);
+ if (IS_ERR(env))
+ RETURN(PTR_ERR(env));
+
+ cli = &obd->u.cli;
+ client_obd_list_lock(&cli->cl_loi_list_lock);
+ cli->cl_sf_wait.sfw_oi = oinfo;
+ cli->cl_sf_wait.sfw_upcall = oinfo->oi_cb_up;
+ cli->cl_sf_wait.started = 1;
+ /* creating cl_loi_sync_fs list */
+ cfs_list_for_each_entry_safe(loi, tloi, &cli->cl_loi_write_list,
+ loi_write_item) {
+ lop = &loi->loi_write_lop;
+ cfs_list_for_each_entry_safe(oap, toap, &lop->lop_pending,
+ oap_pending_item)
+ osc_set_async_flags_base(cli, loi, oap, ASYNC_SYNCFS);
+ }
+
+ osc_check_rpcs(env, cli);
+ osc_wake_sync_fs(cli);
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
+ cl_env_put(env, &refcheck);
+ RETURN(rc);
+}
+
static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
{
return osc_process_config_base(obd, buf);
.o_unpackmd = osc_unpackmd,
.o_precreate = osc_precreate,
.o_create = osc_create,
+ .o_create_async = osc_create_async,
.o_destroy = osc_destroy,
.o_getattr = osc_getattr,
.o_getattr_async = osc_getattr_async,
.o_sync = osc_sync,
.o_enqueue = osc_enqueue,
.o_change_cbdata = osc_change_cbdata,
+ .o_find_cbdata = osc_find_cbdata,
.o_cancel = osc_cancel,
.o_cancel_unused = osc_cancel_unused,
.o_iocontrol = osc_iocontrol,
.o_llog_init = osc_llog_init,
.o_llog_finish = osc_llog_finish,
.o_process_config = osc_process_config,
+ .o_sync_fs = osc_sync_fs,
};
-extern struct lu_kmem_descr osc_caches[];
-extern spinlock_t osc_ast_guard;
-extern struct lock_class_key osc_ast_guard_class;
+extern struct lu_kmem_descr osc_caches[];
+extern cfs_spinlock_t osc_ast_guard;
+extern cfs_lock_class_key_t osc_ast_guard_class;
int __init osc_init(void)
{
lprocfs_osc_init_vars(&lvars);
- request_module("lquota");
+ cfs_request_module("lquota");
quota_interface = PORTAL_SYMBOL_GET(osc_quota_interface);
lquota_init(quota_interface);
init_obd_quota_ops(quota_interface, &osc_obd_ops);
RETURN(rc);
}
- spin_lock_init(&osc_ast_guard);
- lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
+ cfs_spin_lock_init(&osc_ast_guard);
+ cfs_lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
+
+ osc_mds_ost_orig_logops = llog_lvfs_ops;
+ osc_mds_ost_orig_logops.lop_setup = llog_obd_origin_setup;
+ osc_mds_ost_orig_logops.lop_cleanup = llog_obd_origin_cleanup;
+ osc_mds_ost_orig_logops.lop_add = llog_obd_origin_add;
+ osc_mds_ost_orig_logops.lop_connect = llog_origin_connect;
RETURN(rc);
}