* Use is subject to license terms.
*/
/*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
* This file is part of Lustre, http://www.lustre.org/
* Lustre is a trademark of Sun Microsystems, Inc.
*/
struct lov_mds_md *lmm, int lmm_bytes)
{
int lsm_size;
+ struct obd_import *imp = class_exp2cliimp(exp);
ENTRY;
if (lmm != NULL) {
LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
}
- (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
+ if (imp != NULL &&
+ (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
+ (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
+ else
+ (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
RETURN(lsm_size);
}
oinfo->oi_cb_up, oinfo, rqset);
}
-static int osc_sync(struct obd_export *exp, struct obdo *oa,
- struct lov_stripe_md *md, obd_size start, obd_size end,
- void *capa)
+static int osc_sync_interpret(const struct lu_env *env,
+ struct ptlrpc_request *req,
+ void *arg, int rc)
+{
+ struct osc_async_args *aa = arg;
+ struct ost_body *body;
+ ENTRY;
+
+ if (rc)
+ GOTO(out, rc);
+
+ body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+ if (body == NULL) {
+ CERROR ("can't unpack ost_body\n");
+ GOTO(out, rc = -EPROTO);
+ }
+
+ *aa->aa_oi->oi_oa = body->oa;
+out:
+ rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
+ RETURN(rc);
+}
+
+static int osc_sync(struct obd_export *exp, struct obd_info *oinfo,
+ obd_size start, obd_size end,
+ struct ptlrpc_request_set *set)
{
struct ptlrpc_request *req;
struct ost_body *body;
+ struct osc_async_args *aa;
int rc;
ENTRY;
- if (!oa) {
+ if (!oinfo->oi_oa) {
CDEBUG(D_INFO, "oa NULL\n");
RETURN(-EINVAL);
}
if (req == NULL)
RETURN(-ENOMEM);
- osc_set_capa_size(req, &RMF_CAPA1, capa);
+ osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
if (rc) {
ptlrpc_request_free(req);
/* overload the size and blocks fields in the oa with start/end */
body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
LASSERT(body);
- lustre_set_wire_obdo(&body->oa, oa);
+ lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
body->oa.o_size = start;
body->oa.o_blocks = end;
body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
- osc_pack_capa(req, body, capa);
+ osc_pack_capa(req, body, oinfo->oi_capa);
ptlrpc_request_set_replen(req);
+ req->rq_interpret_reply = osc_sync_interpret;
- rc = ptlrpc_queue_wait(req);
- if (rc)
- GOTO(out, rc);
-
- body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
- if (body == NULL)
- GOTO(out, rc = -EPROTO);
-
- lustre_get_wire_obdo(oa, &body->oa);
+ CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+ aa = ptlrpc_req_async_args(req);
+ aa->aa_oi = oinfo;
- EXIT;
- out:
- ptlrpc_req_finished(req);
- return rc;
+ ptlrpc_set_add_req(set, req);
+ RETURN (0);
}
/* Find and cancel locally locks matched by @mode in the resource found by
return cli->cl_r_in_flight + cli->cl_w_in_flight;
}
-int osc_wake_sync_fs(struct client_obd *cli)
-{
- ENTRY;
- if (cfs_list_empty(&cli->cl_loi_sync_fs_list) &&
- cli->cl_sf_wait.started) {
- cli->cl_sf_wait.sfw_upcall(cli->cl_sf_wait.sfw_oi, 0);
- cli->cl_sf_wait.started = 0;
- }
- RETURN(0);
-}
-
/* caller must hold loi_list_lock */
void osc_wake_cache_waiters(struct client_obd *cli)
{
CWARN("%s: available grant < 0, the OSS is probably not running"
" with patch from bug20278 (%ld) \n",
cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
- /* workaround for 1.6 servers which do not have
+ /* workaround for 1.6 servers which do not have
* the patch from bug20278 */
cli->cl_avail_grant = ocd->ocd_grant;
}
/* return error if any niobuf was in error */
for (i = 0; i < niocount; i++) {
- if (remote_rcs[i] < 0)
+ if ((int)remote_rcs[i] < 0)
return(remote_rcs[i]);
if (remote_rcs[i] != 0) {
struct lov_stripe_md *lsm, obd_count page_count,
struct brw_page **pga,
struct ptlrpc_request **reqp,
- struct obd_capa *ocapa, int reserve)
+ struct obd_capa *ocapa, int reserve,
+ int resend)
{
struct ptlrpc_request *req;
struct ptlrpc_bulk_desc *desc;
pg_prev = pga[0];
for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
struct brw_page *pg = pga[i];
+ int poff = pg->off & ~CFS_PAGE_MASK;
LASSERT(pg->count > 0);
- LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
- "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
- pg->off, pg->count);
+ /* make sure there is no gap in the middle of page array */
+ LASSERTF(page_count == 1 ||
+ (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
+ ergo(i > 0 && i < page_count - 1,
+ poff == 0 && pg->count == CFS_PAGE_SIZE) &&
+ ergo(i == page_count - 1, poff == 0)),
+ "i: %d/%d pg: %p off: "LPU64", count: %u\n",
+ i, page_count, pg, pg->off, pg->count);
#ifdef __linux__
LASSERTF(i == 0 || pg->off > pg_prev->off,
"i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
(pg->flag & OBD_BRW_SRVLOCK));
- ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
- pg->count);
+ ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
requested_nob += pg->count;
if (i > 0 && can_merge_pages(pg_prev, pg)) {
&RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
+ if (resend) {
+ if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
+ body->oa.o_valid |= OBD_MD_FLFLAGS;
+ body->oa.o_flags = 0;
+ }
+ body->oa.o_flags |= OBD_FL_RECOV_RESEND;
+ }
+
if (osc_should_shrink_grant(cli))
osc_shrink_grant_local(cli, &body->oa);
if (server_cksum == ~0 && rc > 0) {
CERROR("Protocol error: server %s set the 'checksum' "
"bit, but didn't send a checksum. Not fatal, "
- "but please notify on http://bugzilla.lustre.org/\n",
+ "but please notify on http://bugs.whamcloud.com/\n",
libcfs_nid2str(peer->nid));
} else if (server_cksum != client_cksum) {
LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
restart_bulk:
rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
- page_count, pga, &req, ocapa, 0);
+ page_count, pga, &req, ocapa, 0, resends);
if (rc != 0)
return (rc);
ptlrpc_req_finished(req);
if (osc_recoverable_error(rc)) {
resends++;
- if (!osc_should_resend(resends, &exp->exp_obd->u.cli)) {
+ if (!client_should_resend(resends, &exp->exp_obd->u.cli)) {
CERROR("too many resend retries, returning error\n");
RETURN(-EIO);
}
int rc = 0;
ENTRY;
- if (!osc_should_resend(aa->aa_resends, aa->aa_cli)) {
+ if (!client_should_resend(aa->aa_resends, aa->aa_cli)) {
CERROR("too many resent retries, returning error\n");
RETURN(-EIO);
}
aa->aa_cli, aa->aa_oa,
NULL /* lsm unused by osc currently */,
aa->aa_page_count, aa->aa_ppga,
- &new_req, aa->aa_ocapa, 0);
+ &new_req, aa->aa_ocapa, 0, 1);
if (rc)
RETURN(rc);
osc_release_write_grant(cli, &oap->oap_brw_page, sent);
}
-static int lop_makes_syncfs_rpc(struct loi_oap_pages *lop)
-{
- struct osc_async_page *oap;
- ENTRY;
-
- if (cfs_list_empty(&lop->lop_urgent))
- RETURN(0);
-
- oap = cfs_list_entry(lop->lop_urgent.next,
- struct osc_async_page, oap_urgent_item);
-
- if (oap->oap_async_flags & ASYNC_SYNCFS) {
- CDEBUG(D_CACHE, "syncfs request forcing RPC\n");
- RETURN(1);
- }
-
- RETURN(0);
-}
/* This maintains the lists of pending pages to read/write for a given object
* (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
} else {
- if (lop_makes_syncfs_rpc(&loi->loi_write_lop)) {
- on_list(&loi->loi_sync_fs_item,
- &cli->cl_loi_sync_fs_list,
- loi->loi_write_lop.lop_num_pending);
- } else {
- on_list(&loi->loi_hp_ready_item,
- &cli->cl_loi_hp_ready_list, 0);
- on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
- lop_makes_rpc(cli, &loi->loi_write_lop,
- OBD_BRW_WRITE)||
- lop_makes_rpc(cli, &loi->loi_read_lop,
- OBD_BRW_READ));
- }
+ on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
+ on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
+ lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
+ lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
}
on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
ar->ar_force_sync = 0;
}
-static int osc_add_to_lop_urgent(struct loi_oap_pages *lop,
- struct osc_async_page *oap,
- obd_flag async_flags)
-{
-
- /* If true, then already present in lop urgent */
- if (!cfs_list_empty(&oap->oap_urgent_item)) {
- CWARN("Request to add duplicate oap_urgent for flag = %d\n",
- oap->oap_async_flags);
- return 1;
- }
-
- /* item from sync_fs, to avoid duplicates check the existing flags */
- if (async_flags & ASYNC_SYNCFS) {
- cfs_list_add_tail(&oap->oap_urgent_item,
- &lop->lop_urgent);
- return 0;
- }
-
- if (oap->oap_async_flags & ASYNC_HP)
- cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
- else if (oap->oap_async_flags & ASYNC_URGENT ||
- async_flags & ASYNC_URGENT)
- cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
-
- return 0;
-}
-
void osc_oap_to_pending(struct osc_async_page *oap)
{
struct loi_oap_pages *lop;
else
lop = &oap->oap_loi->loi_read_lop;
- osc_add_to_lop_urgent(lop, oap, 0);
+ if (oap->oap_async_flags & ASYNC_HP)
+ cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
+ else if (oap->oap_async_flags & ASYNC_URGENT)
+ cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
}
osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
}
osc_wake_cache_waiters(cli);
- osc_wake_sync_fs(cli);
osc_check_rpcs(env, cli);
client_obd_list_unlock(&cli->cl_loi_list_lock);
if (!async)
sort_brw_pages(pga, page_count);
rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
- pga, &req, crattr.cra_capa, 1);
+ pga, &req, crattr.cra_capa, 1, 0);
if (rc != 0) {
CERROR("prep_req failed: %d\n", rc);
GOTO(out, req = ERR_PTR(rc));
struct osc_brw_async_args *aa;
const struct obd_async_page_ops *ops;
CFS_LIST_HEAD(rpc_list);
- CFS_LIST_HEAD(tmp_list);
- unsigned int ending_offset;
- unsigned starting_offset = 0;
int srvlock = 0, mem_tight = 0;
struct cl_object *clob = NULL;
+ obd_off starting_offset = OBD_OBJECT_EOF;
+ unsigned int ending_offset;
+ int starting_page_off = 0;
ENTRY;
/* ASYNC_HP pages first. At present, when the lock the pages is
* to be canceled, the pages covered by the lock will be sent out
* with ASYNC_HP. We have to send out them as soon as possible. */
cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
- if (oap->oap_async_flags & ASYNC_HP)
- cfs_list_move(&oap->oap_pending_item, &tmp_list);
- else
- cfs_list_move_tail(&oap->oap_pending_item, &tmp_list);
+ if (oap->oap_async_flags & ASYNC_HP)
+ cfs_list_move(&oap->oap_pending_item, &lop->lop_pending);
if (++page_count >= cli->cl_max_pages_per_rpc)
break;
}
-
- cfs_list_splice(&tmp_list, &lop->lop_pending);
page_count = 0;
/* first we find the pages we're allowed to work with */
/* If there is a gap at the start of this page, it can't merge
* with any previous page, so we'll hand the network a
* "fragmented" page array that it can't transfer in 1 RDMA */
- if (page_count != 0 && oap->oap_page_off != 0)
+ if (oap->oap_obj_off < starting_offset) {
+ if (starting_page_off != 0)
+ break;
+
+ starting_page_off = oap->oap_page_off;
+ starting_offset = oap->oap_obj_off + starting_page_off;
+ } else if (oap->oap_page_off != 0)
break;
/* in llite being 'ready' equates to the page being locked
lop_update_pending(cli, lop, cmd, -1);
cfs_list_del_init(&oap->oap_urgent_item);
- if (page_count == 0)
- starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
- (PTLRPC_MAX_BRW_SIZE - 1);
-
/* ask the caller for the size of the io as the rpc leaves. */
if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
oap->oap_count =
/* now put the page back in our accounting */
cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
+ if (page_count++ == 0)
+ srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
+
if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
mem_tight = 1;
- if (page_count == 0)
- srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
- if (++page_count >= cli->cl_max_pages_per_rpc)
- break;
/* End on a PTLRPC_MAX_BRW_SIZE boundary. We want full-sized
* RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
* have the same alignment as the initial writes that allocated
* extents on the server. */
- ending_offset = (oap->oap_obj_off + oap->oap_page_off +
- oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
- if (ending_offset == 0)
+ ending_offset = oap->oap_obj_off + oap->oap_page_off +
+ oap->oap_count;
+ if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1)))
+ break;
+
+ if (page_count >= cli->cl_max_pages_per_rpc)
break;
/* If there is a gap at the end of this page, it can't merge
}
osc_wake_cache_waiters(cli);
- osc_wake_sync_fs(cli);
+
loi_list_maint(cli, loi);
client_obd_list_unlock(&cli->cl_loi_list_lock);
aa = ptlrpc_req_async_args(req);
+ starting_offset &= PTLRPC_MAX_BRW_SIZE - 1;
if (cmd == OBD_BRW_READ) {
lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
if (!cfs_list_empty(&cli->cl_loi_ready_list))
RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
struct lov_oinfo, loi_ready_item));
- if (!cfs_list_empty(&cli->cl_loi_sync_fs_list))
- RETURN(cfs_list_entry(cli->cl_loi_sync_fs_list.next,
- struct lov_oinfo, loi_sync_fs_item));
/* then if we have cache waiters, return all objects with queued
* writes. This is especially important when many small files
if (rc > 0)
race_counter = 0;
- else
+ else if (rc == 0)
race_counter++;
}
if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
if (rc > 0)
race_counter = 0;
- else
+ else if (rc == 0)
race_counter++;
}
cfs_list_del_init(&loi->loi_write_item);
if (!cfs_list_empty(&loi->loi_read_item))
cfs_list_del_init(&loi->loi_read_item);
- if (!cfs_list_empty(&loi->loi_sync_fs_item))
- cfs_list_del_init(&loi->loi_sync_fs_item);
loi_list_maint(cli, loi);
struct osc_async_page *oap)
{
struct osc_cache_waiter ocw;
- struct l_wait_info lwi = { 0 };
+ struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
ENTRY;
/* force the caller to try sync io. this can jump the list
* of queued writes and create a discontiguous rpc stream */
- if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
- loi->loi_ar.ar_force_sync)
+ if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
+ cli->cl_dirty_max < CFS_PAGE_SIZE ||
+ cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
RETURN(-EDQUOT);
/* Hopefully normal case - cache space and write credits available */
RETURN(0);
}
-struct osc_async_page *oap_from_cookie(void *cookie)
-{
- struct osc_async_page *oap = cookie;
- if (oap->oap_magic != OAP_MAGIC)
- return ERR_PTR(-EINVAL);
- return oap;
-};
-
-int osc_queue_async_io(const struct lu_env *env,
- struct obd_export *exp, struct lov_stripe_md *lsm,
- struct lov_oinfo *loi, void *cookie,
- int cmd, obd_off off, int count,
- obd_flag brw_flags, enum async_flags async_flags)
+int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
+ struct lov_stripe_md *lsm, struct lov_oinfo *loi,
+ struct osc_async_page *oap, int cmd, int off,
+ int count, obd_flag brw_flags, enum async_flags async_flags)
{
struct client_obd *cli = &exp->exp_obd->u.cli;
- struct osc_async_page *oap;
int rc = 0;
ENTRY;
- oap = oap_from_cookie(cookie);
- if (IS_ERR(oap))
- RETURN(PTR_ERR(oap));
+ if (oap->oap_magic != OAP_MAGIC)
+ RETURN(-EINVAL);
if (cli->cl_import == NULL || cli->cl_import->imp_invalid)
RETURN(-EIO);
if ((oap->oap_async_flags & async_flags) == async_flags)
RETURN(0);
- /* XXX: This introduces a tiny insignificant race for the case if this
- * loi already had other urgent items.
- */
- if (SETTING(oap->oap_async_flags, async_flags, ASYNC_SYNCFS) &&
- cfs_list_empty(&oap->oap_rpc_item) &&
- cfs_list_empty(&oap->oap_urgent_item)) {
- osc_add_to_lop_urgent(lop, oap, ASYNC_SYNCFS);
- flags |= ASYNC_SYNCFS;
- cfs_spin_lock(&oap->oap_lock);
- oap->oap_async_flags |= flags;
- cfs_spin_unlock(&oap->oap_lock);
- loi_list_maint(cli, loi);
- RETURN(0);
- }
-
if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
flags |= ASYNC_READY;
if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
cfs_list_empty(&oap->oap_rpc_item)) {
- osc_add_to_lop_urgent(lop, oap, ASYNC_URGENT);
+ if (oap->oap_async_flags & ASYNC_HP)
+ cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
+ else
+ cfs_list_add_tail(&oap->oap_urgent_item,
+ &lop->lop_urgent);
flags |= ASYNC_URGENT;
loi_list_maint(cli, loi);
}
RETURN(0);
}
-int osc_teardown_async_page(struct obd_export *exp,
- struct lov_stripe_md *lsm,
- struct lov_oinfo *loi, void *cookie)
+int osc_teardown_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
+ struct lov_oinfo *loi, struct osc_async_page *oap)
{
struct client_obd *cli = &exp->exp_obd->u.cli;
struct loi_oap_pages *lop;
- struct osc_async_page *oap;
int rc = 0;
ENTRY;
- oap = oap_from_cookie(cookie);
- if (IS_ERR(oap))
- RETURN(PTR_ERR(oap));
+ if (oap->oap_magic != OAP_MAGIC)
+ RETURN(-EINVAL);
if (loi == NULL)
loi = lsm->lsm_oinfo[0];
if (!cfs_list_empty(&oap->oap_urgent_item)) {
cfs_list_del_init(&oap->oap_urgent_item);
cfs_spin_lock(&oap->oap_lock);
- oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP |
- ASYNC_SYNCFS);
+ oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
cfs_spin_unlock(&oap->oap_lock);
}
if (!cfs_list_empty(&oap->oap_pending_item)) {
RETURN(rc);
}
-static void osc_set_lock_data_with_check(struct ldlm_lock *lock,
- struct ldlm_enqueue_info *einfo,
- int flags)
+static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
+ struct ldlm_enqueue_info *einfo)
{
void *data = einfo->ei_cbdata;
+ int set = 0;
LASSERT(lock != NULL);
LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
lock_res_and_lock(lock);
cfs_spin_lock(&osc_ast_guard);
- LASSERT(lock->l_ast_data == NULL || lock->l_ast_data == data);
- lock->l_ast_data = data;
+
+ if (lock->l_ast_data == NULL)
+ lock->l_ast_data = data;
+ if (lock->l_ast_data == data)
+ set = 1;
+
cfs_spin_unlock(&osc_ast_guard);
unlock_res_and_lock(lock);
+
+ return set;
}
-static void osc_set_data_with_check(struct lustre_handle *lockh,
- struct ldlm_enqueue_info *einfo,
- int flags)
+static int osc_set_data_with_check(struct lustre_handle *lockh,
+ struct ldlm_enqueue_info *einfo)
{
struct ldlm_lock *lock = ldlm_handle2lock(lockh);
+ int set = 0;
if (lock != NULL) {
- osc_set_lock_data_with_check(lock, einfo, flags);
+ set = osc_set_lock_data_with_check(lock, einfo);
LDLM_LOCK_PUT(lock);
} else
CERROR("lockh %p, data %p - client evicted?\n",
lockh, einfo->ei_cbdata);
+ return set;
}
static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
if (mode) {
struct ldlm_lock *matched = ldlm_handle2lock(lockh);
- if (matched->l_ast_data == NULL ||
- matched->l_ast_data == einfo->ei_cbdata) {
+ if (osc_set_lock_data_with_check(matched, einfo)) {
/* addref the lock only if not async requests and PW
* lock is matched whereas we asked for PR. */
if (!rqset && einfo->ei_mode != mode)
ldlm_lock_addref(lockh, LCK_PR);
- osc_set_lock_data_with_check(matched, einfo, *flags);
if (intent) {
/* I would like to be able to ASSERT here that
* rss <= kms, but I can't, for reasons which
rc = ldlm_lock_match(obd->obd_namespace, lflags,
res_id, type, policy, rc, lockh, unref);
if (rc) {
- if (data != NULL)
- osc_set_data_with_check(lockh, data, lflags);
+ if (data != NULL) {
+ if (!osc_set_data_with_check(lockh, data)) {
+ if (!(lflags & LDLM_FL_TEST_LOCK))
+ ldlm_lock_decref(lockh, rc);
+ RETURN(0);
+ }
+ }
if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
ldlm_lock_addref(lockh, LCK_PR);
ldlm_lock_decref(lockh, LCK_PW);
* avail < ~0.1% max max = avail + used
* 1025 * avail < avail + used used = blocks - free
* 1024 * avail < used
- * 1024 * avail < blocks - free
- * avail < ((blocks - free) >> 10)
+ * 1024 * avail < blocks - free
+ * avail < ((blocks - free) >> 10)
*
* On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
* lose that amount of space so in those cases we report no space left
rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
break;
}
+ case IMP_EVENT_DEACTIVATE: {
+ rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
+ break;
+ }
+ case IMP_EVENT_ACTIVATE: {
+ rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
+ break;
+ }
default:
CERROR("Unknown import event %d\n", event);
LBUG();
break;
}
case OBD_CLEANUP_EXPORTS: {
- /* If we set up but never connected, the
- client import will not have been cleaned. */
- if (obd->u.cli.cl_import) {
- struct obd_import *imp;
- cfs_down_write(&obd->u.cli.cl_sem);
- imp = obd->u.cli.cl_import;
- CDEBUG(D_CONFIG, "%s: client import never connected\n",
- obd->obd_name);
- ptlrpc_invalidate_import(imp);
- if (imp->imp_rq_pool) {
- ptlrpc_free_rq_pool(imp->imp_rq_pool);
- imp->imp_rq_pool = NULL;
- }
- class_destroy_import(imp);
- cfs_up_write(&obd->u.cli.cl_sem);
- obd->u.cli.cl_import = NULL;
- }
+ /* LU-464
+ * for echo client, export may be on zombie list, wait for
+ * zombie thread to cull it, because cli.cl_import will be
+ * cleared in client_disconnect_export():
+ * class_export_destroy() -> obd_cleanup() ->
+ * echo_device_free() -> echo_client_cleanup() ->
+ * obd_disconnect() -> osc_disconnect() ->
+ * client_disconnect_export()
+ */
+ obd_zombie_barrier();
+ obd_cleanup_client_import(obd);
rc = obd_llog_finish(obd, 0);
if (rc != 0)
CERROR("failed to cleanup llogging subsystems\n");
return(rc);
}
-static int osc_sync_fs(struct obd_export *exp, struct obd_info *oinfo,
- int wait)
-{
- struct obd_device *obd = class_exp2obd(exp);
- struct client_obd *cli;
- struct lov_oinfo *loi;
- struct lov_oinfo *tloi;
- struct osc_async_page *oap;
- struct osc_async_page *toap;
- struct loi_oap_pages *lop;
- struct lu_env *env;
- int refcheck;
- int rc = 0;
- ENTRY;
-
- env = cl_env_get(&refcheck);
- if (IS_ERR(env))
- RETURN(PTR_ERR(env));
-
- cli = &obd->u.cli;
- client_obd_list_lock(&cli->cl_loi_list_lock);
- cli->cl_sf_wait.sfw_oi = oinfo;
- cli->cl_sf_wait.sfw_upcall = oinfo->oi_cb_up;
- cli->cl_sf_wait.started = 1;
- /* creating cl_loi_sync_fs list */
- cfs_list_for_each_entry_safe(loi, tloi, &cli->cl_loi_write_list,
- loi_write_item) {
- lop = &loi->loi_write_lop;
- cfs_list_for_each_entry_safe(oap, toap, &lop->lop_pending,
- oap_pending_item)
- osc_set_async_flags_base(cli, loi, oap, ASYNC_SYNCFS);
- }
- osc_check_rpcs(env, cli);
- osc_wake_sync_fs(cli);
- client_obd_list_unlock(&cli->cl_loi_list_lock);
- cl_env_put(env, &refcheck);
-
- RETURN(rc);
-}
-
static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
{
return osc_process_config_base(obd, buf);
.o_llog_init = osc_llog_init,
.o_llog_finish = osc_llog_finish,
.o_process_config = osc_process_config,
- .o_sync_fs = osc_sync_fs,
};
extern struct lu_kmem_descr osc_caches[];