struct ptlrpc_request *req,
void *arg, int rc)
{
- struct osc_async_args *aa = arg;
+ struct osc_fsync_args *fa = arg;
struct ost_body *body;
ENTRY;
GOTO(out, rc = -EPROTO);
}
- *aa->aa_oi->oi_oa = body->oa;
+ *fa->fa_oi->oi_oa = body->oa;
out:
- rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
- RETURN(rc);
+ rc = fa->fa_upcall(fa->fa_cookie, rc);
+ RETURN(rc);
}
-static int osc_sync(const struct lu_env *env, struct obd_export *exp,
- struct obd_info *oinfo, obd_size start, obd_size end,
- struct ptlrpc_request_set *set)
+int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
+ obd_enqueue_update_f upcall, void *cookie,
+ struct ptlrpc_request_set *rqset)
{
- struct ptlrpc_request *req;
- struct ost_body *body;
- struct osc_async_args *aa;
+ struct ptlrpc_request *req;
+ struct ost_body *body;
+ struct osc_fsync_args *fa;
int rc;
ENTRY;
- if (!oinfo->oi_oa) {
- CDEBUG(D_INFO, "oa NULL\n");
- RETURN(-EINVAL);
- }
-
req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
if (req == NULL)
RETURN(-ENOMEM);
body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
LASSERT(body);
lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
- body->oa.o_size = start;
- body->oa.o_blocks = end;
- body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
osc_pack_capa(req, body, oinfo->oi_capa);
ptlrpc_request_set_replen(req);
req->rq_interpret_reply = osc_sync_interpret;
- CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
- aa = ptlrpc_req_async_args(req);
- aa->aa_oi = oinfo;
+ CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
+ fa = ptlrpc_req_async_args(req);
+ fa->fa_oi = oinfo;
+ fa->fa_upcall = upcall;
+ fa->fa_cookie = cookie;
- ptlrpc_set_add_req(set, req);
- RETURN (0);
+ if (rqset == PTLRPCD_SET)
+ ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+ else
+ ptlrpc_set_add_req(rqset, req);
+
+ RETURN (0);
+}
+
+static int osc_sync(const struct lu_env *env, struct obd_export *exp,
+ struct obd_info *oinfo, obd_size start, obd_size end,
+ struct ptlrpc_request_set *set)
+{
+ ENTRY;
+
+ if (!oinfo->oi_oa) {
+ CDEBUG(D_INFO, "oa NULL\n");
+ RETURN(-EINVAL);
+ }
+
+ oinfo->oi_oa->o_size = start;
+ oinfo->oi_oa->o_blocks = end;
+ oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
+
+ RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
}
/* Find and cancel locally locks matched by @mode in the resource found by
(cli->cl_max_rpcs_in_flight + 1);
oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
}
- oa->o_grant = cli->cl_avail_grant;
+ oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
oa->o_dropped = cli->cl_lost_grant;
cli->cl_lost_grant = 0;
client_obd_list_unlock(&cli->cl_loi_list_lock);
cli->cl_avail_grant = ocd->ocd_grant;
}
- client_obd_list_unlock(&cli->cl_loi_list_lock);
+ /* determine the appropriate chunk size used by osc_extent. */
+ cli->cl_chunkbits = max_t(int, CFS_PAGE_SHIFT, ocd->ocd_blocksize);
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
- CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
- cli->cl_import->imp_obd->obd_name,
- cli->cl_avail_grant, cli->cl_lost_grant);
+ CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
+ "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
+ cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
- if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
- cfs_list_empty(&cli->cl_grant_shrink_list))
- osc_add_shrink_grant(cli);
+ if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
+ cfs_list_empty(&cli->cl_grant_shrink_list))
+ osc_add_shrink_grant(cli);
}
/* We assume that the reason this OSC got a short read is because it read
struct osc_brw_async_args *aa)
{
struct ptlrpc_request *new_req;
- struct ptlrpc_request_set *set = request->rq_set;
struct osc_brw_async_args *new_aa;
struct osc_async_page *oap;
int rc = 0;
if (rc)
RETURN(rc);
- client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
-
cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
if (oap->oap_request != NULL) {
LASSERTF(request == oap->oap_request,
"request %p != oap_request %p\n",
request, oap->oap_request);
if (oap->oap_interrupted) {
- client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
ptlrpc_req_finished(new_req);
RETURN(-EINTR);
}
new_aa = ptlrpc_req_async_args(new_req);
CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
- cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
- CFS_INIT_LIST_HEAD(&aa->aa_oaps);
+ cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
+ CFS_INIT_LIST_HEAD(&new_aa->aa_exts);
+ cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
if (oap->oap_request) {
new_aa->aa_ocapa = aa->aa_ocapa;
aa->aa_ocapa = NULL;
- /* use ptlrpc_set_add_req is safe because interpret functions work
- * in check_set context. only one way exist with access to request
- * from different thread got -EINTR - this way protected with
- * cl_loi_list_lock */
- ptlrpc_set_add_req(set, new_req);
+ /* XXX: This code will run into problem if we're going to support
+ * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
+ * and wait for all of them to be finished. We should inherit request
+ * set from old request. */
+ ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
- client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
-
- DEBUG_REQ(D_INFO, new_req, "new request");
- RETURN(0);
+ DEBUG_REQ(D_INFO, new_req, "new request");
+ RETURN(0);
}
/*
static int brw_interpret(const struct lu_env *env,
struct ptlrpc_request *req, void *data, int rc)
{
- struct osc_brw_async_args *aa = data;
- struct osc_async_page *oap, *tmp;
- struct client_obd *cli;
+ struct osc_brw_async_args *aa = data;
+ struct osc_extent *ext;
+ struct osc_extent *tmp;
+ struct cl_object *obj = NULL;
+ struct client_obd *cli = aa->aa_cli;
ENTRY;
rc = osc_brw_fini_request(req, rc);
aa->aa_ocapa = NULL;
}
- cli = aa->aa_cli;
- client_obd_list_lock(&cli->cl_loi_list_lock);
+ cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
+ if (obj == NULL && rc == 0) {
+ obj = osc2cl(ext->oe_obj);
+ cl_object_get(obj);
+ }
- /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
- * is called so we know whether to go to sync BRWs or wait for more
- * RPCs to complete */
- if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
- cli->cl_w_in_flight--;
- else
- cli->cl_r_in_flight--;
-
- /* the caller may re-use the oap after the completion call so
- * we need to clean it up a little */
- cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
- oap_rpc_item) {
- cfs_list_del_init(&oap->oap_rpc_item);
- osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
+ cfs_list_del_init(&ext->oe_link);
+ osc_extent_finish(env, ext, 1, rc);
+ }
+ LASSERT(cfs_list_empty(&aa->aa_exts));
+ LASSERT(cfs_list_empty(&aa->aa_oaps));
+
+ if (obj != NULL) {
+ struct obdo *oa = aa->aa_oa;
+ struct cl_attr *attr = &osc_env_info(env)->oti_attr;
+ unsigned long valid = 0;
+
+ LASSERT(rc == 0);
+ if (oa->o_valid & OBD_MD_FLBLOCKS) {
+ attr->cat_blocks = oa->o_blocks;
+ valid |= CAT_BLOCKS;
+ }
+ if (oa->o_valid & OBD_MD_FLMTIME) {
+ attr->cat_mtime = oa->o_mtime;
+ valid |= CAT_MTIME;
+ }
+ if (oa->o_valid & OBD_MD_FLATIME) {
+ attr->cat_atime = oa->o_atime;
+ valid |= CAT_ATIME;
+ }
+ if (oa->o_valid & OBD_MD_FLCTIME) {
+ attr->cat_ctime = oa->o_ctime;
+ valid |= CAT_CTIME;
+ }
+ if (valid != 0) {
+ cl_object_attr_lock(obj);
+ cl_object_attr_set(env, obj, attr, valid);
+ cl_object_attr_unlock(obj);
+ }
+ cl_object_put(env, obj);
}
OBDO_FREE(aa->aa_oa);
- osc_wake_cache_waiters(cli);
- osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
- client_obd_list_unlock(&cli->cl_loi_list_lock);
-
cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
req->rq_bulk->bd_nob_transferred);
osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
+ client_obd_list_lock(&cli->cl_loi_list_lock);
+ /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
+ * is called so we know whether to go to sync BRWs or wait for more
+ * RPCs to complete */
+ if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
+ cli->cl_w_in_flight--;
+ else
+ cli->cl_r_in_flight--;
+ osc_wake_cache_waiters(cli);
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+ osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
RETURN(rc);
}
-/* The most tricky part of this function is that it will return with
- * cli->cli_loi_list_lock held.
+/**
+ * Build an RPC by the list of extent @ext_list. The caller must ensure
+ * that the total pages in this list are NOT over max pages per RPC.
+ * Extents in the list must be in OES_RPC state.
*/
int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
- cfs_list_t *rpc_list, int page_count, int cmd,
- pdl_policy_t pol)
+ cfs_list_t *ext_list, int cmd, pdl_policy_t pol)
{
struct ptlrpc_request *req = NULL;
+ struct osc_extent *ext;
+ CFS_LIST_HEAD(rpc_list);
struct brw_page **pga = NULL;
struct osc_brw_async_args *aa = NULL;
struct obdo *oa = NULL;
enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
struct ldlm_lock *lock = NULL;
struct cl_req_attr crattr;
- int i, rc, mpflag = 0;
-
- ENTRY;
- LASSERT(!cfs_list_empty(rpc_list));
+ obd_off starting_offset = OBD_OBJECT_EOF;
+ obd_off ending_offset = 0;
+ int i, rc, mpflag = 0, mem_tight = 0, page_count = 0;
+
+ ENTRY;
+ LASSERT(!cfs_list_empty(ext_list));
+
+ /* add pages into rpc_list to build BRW rpc */
+ cfs_list_for_each_entry(ext, ext_list, oe_link) {
+ LASSERT(ext->oe_state == OES_RPC);
+ mem_tight |= ext->oe_memalloc;
+ cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
+ ++page_count;
+ cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
+ if (starting_offset > oap->oap_obj_off)
+ starting_offset = oap->oap_obj_off;
+ else
+ LASSERT(oap->oap_page_off == 0);
+ if (ending_offset < oap->oap_obj_off + oap->oap_count)
+ ending_offset = oap->oap_obj_off +
+ oap->oap_count;
+ else
+ LASSERT(oap->oap_page_off + oap->oap_count ==
+ CFS_PAGE_SIZE);
+ }
+ }
- if (cmd & OBD_BRW_MEMALLOC)
- mpflag = cfs_memory_pressure_get_and_set();
+ if (mem_tight)
+ mpflag = cfs_memory_pressure_get_and_set();
- memset(&crattr, 0, sizeof crattr);
- OBD_ALLOC(pga, sizeof(*pga) * page_count);
- if (pga == NULL)
+ memset(&crattr, 0, sizeof crattr);
+ OBD_ALLOC(pga, sizeof(*pga) * page_count);
+ if (pga == NULL)
GOTO(out, rc = -ENOMEM);
OBDO_ALLOC(oa);
GOTO(out, rc = -ENOMEM);
i = 0;
- cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
- struct cl_page *page = osc_oap2cl_page(oap);
+ cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
+ struct cl_page *page = oap2cl_page(oap);
if (clerq == NULL) {
clerq = cl_req_alloc(env, page, crt,
1 /* only 1-object rpcs for
* now */);
if (IS_ERR(clerq))
GOTO(out, rc = PTR_ERR(clerq));
- lock = oap->oap_ldlm_lock;
- }
+ lock = oap->oap_ldlm_lock;
+ }
+ if (mem_tight)
+ oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
pga[i] = &oap->oap_brw_page;
pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
/* always get the data for the obdo for the rpc */
LASSERT(clerq != NULL);
- crattr.cra_oa = oa;
- crattr.cra_capa = NULL;
+ crattr.cra_oa = oa;
+ crattr.cra_capa = NULL;
memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
cl_req_attr_set(env, clerq, &crattr, ~0ULL);
if (lock) {
if (rc != 0) {
CERROR("cl_req_prep failed: %d\n", rc);
GOTO(out, rc);
- }
+ }
- sort_brw_pages(pga, page_count);
- rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
- pga, &req, crattr.cra_capa, 1, 0);
- if (rc != 0) {
- CERROR("prep_req failed: %d\n", rc);
+ sort_brw_pages(pga, page_count);
+ rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
+ pga, &req, crattr.cra_capa, 1, 0);
+ if (rc != 0) {
+ CERROR("prep_req failed: %d\n", rc);
GOTO(out, rc);
}
req->rq_interpret_reply = brw_interpret;
- if (cmd & OBD_BRW_MEMALLOC)
+ if (mem_tight != 0)
req->rq_memalloc = 1;
/* Need to update the timestamps after the request is built in case
lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid);
- CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
- aa = ptlrpc_req_async_args(req);
- CFS_INIT_LIST_HEAD(&aa->aa_oaps);
- cfs_list_splice(rpc_list, &aa->aa_oaps);
- CFS_INIT_LIST_HEAD(rpc_list);
- aa->aa_clerq = clerq;
+ CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+ aa = ptlrpc_req_async_args(req);
+ CFS_INIT_LIST_HEAD(&aa->aa_oaps);
+ cfs_list_splice_init(&rpc_list, &aa->aa_oaps);
+ CFS_INIT_LIST_HEAD(&aa->aa_exts);
+ cfs_list_splice_init(ext_list, &aa->aa_exts);
+ aa->aa_clerq = clerq;
+
+ /* queued sync pages can be torn down while the pages
+ * were between the pending list and the rpc */
+ tmp = NULL;
+ cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
+ /* only one oap gets a request reference */
+ if (tmp == NULL)
+ tmp = oap;
+ if (oap->oap_interrupted && !req->rq_intr) {
+ CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
+ oap, req);
+ ptlrpc_mark_interrupted(req);
+ }
+ }
+ if (tmp != NULL)
+ tmp->oap_request = ptlrpc_request_addref(req);
+
+ client_obd_list_lock(&cli->cl_loi_list_lock);
+ starting_offset >>= CFS_PAGE_SHIFT;
+ if (cmd == OBD_BRW_READ) {
+ cli->cl_r_in_flight++;
+ lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
+ lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
+ lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
+ starting_offset + 1);
+ } else {
+ cli->cl_w_in_flight++;
+ lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
+ lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
+ lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
+ starting_offset + 1);
+ }
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+ DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
+ page_count, aa, cli->cl_r_in_flight,
+ cli->cl_w_in_flight);
+
+ /* XXX: Maybe the caller can check the RPC bulk descriptor to
+ * see which CPU/NUMA node the majority of pages were allocated
+ * on, and try to assign the async RPC to the CPU core
+ * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
+ *
+ * But on the other hand, we expect that multiple ptlrpcd
+ * threads and the initial write sponsor can run in parallel,
+ * especially when data checksum is enabled, which is CPU-bound
+ * operation and single ptlrpcd thread cannot process in time.
+ * So more ptlrpcd threads sharing BRW load
+ * (with PDL_POLICY_ROUND) seems better.
+ */
+ ptlrpcd_add_req(req, pol, -1);
+ rc = 0;
+ EXIT;
+
out:
- if (cmd & OBD_BRW_MEMALLOC)
- cfs_memory_pressure_restore(mpflag);
+ if (mem_tight != 0)
+ cfs_memory_pressure_restore(mpflag);
- capa_put(crattr.cra_capa);
+ capa_put(crattr.cra_capa);
if (rc != 0) {
LASSERT(req == NULL);
OBD_FREE(pga, sizeof(*pga) * page_count);
/* this should happen rarely and is pretty bad, it makes the
* pending list not follow the dirty order */
- client_obd_list_lock(&cli->cl_loi_list_lock);
- cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
- cfs_list_del_init(&oap->oap_rpc_item);
-
- /* queued sync pages can be torn down while the pages
- * were between the pending list and the rpc */
- if (oap->oap_interrupted) {
- CDEBUG(D_INODE, "oap %p interrupted\n", oap);
- osc_ap_completion(env, cli, NULL, oap, 0,
- oap->oap_count);
- continue;
- }
- osc_ap_completion(env, cli, NULL, oap, 0, rc);
+ while (!cfs_list_empty(ext_list)) {
+ ext = cfs_list_entry(ext_list->next, struct osc_extent,
+ oe_link);
+ cfs_list_del_init(&ext->oe_link);
+ osc_extent_finish(env, ext, 0, rc);
}
if (clerq && !IS_ERR(clerq))
cl_req_completion(env, clerq, rc);
- } else {
- struct osc_async_page *tmp = NULL;
-
- /* queued sync pages can be torn down while the pages
- * were between the pending list and the rpc */
- LASSERT(aa != NULL);
- client_obd_list_lock(&cli->cl_loi_list_lock);
- cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
- /* only one oap gets a request reference */
- if (tmp == NULL)
- tmp = oap;
- if (oap->oap_interrupted && !req->rq_intr) {
- CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
- oap, req);
- ptlrpc_mark_interrupted(req);
- }
- }
- if (tmp != NULL)
- tmp->oap_request = ptlrpc_request_addref(req);
-
- DEBUG_REQ(D_INODE,req, "%d pages, aa %p. now %dr/%dw in flight",
- page_count, aa, cli->cl_r_in_flight,
- cli->cl_w_in_flight);
-
- /* XXX: Maybe the caller can check the RPC bulk descriptor to
- * see which CPU/NUMA node the majority of pages were allocated
- * on, and try to assign the async RPC to the CPU core
- * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
- *
- * But on the other hand, we expect that multiple ptlrpcd
- * threads and the initial write sponsor can run in parallel,
- * especially when data checksum is enabled, which is CPU-bound
- * operation and single ptlrpcd thread cannot process in time.
- * So more ptlrpcd threads sharing BRW load
- * (with PDL_POLICY_ROUND) seems better.
- */
- ptlrpcd_add_req(req, pol, -1);
}
RETURN(rc);
}
cli->cl_lost_grant = 0;
client_obd_list_unlock(&cli->cl_loi_list_lock);
- CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
- "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
- cli->cl_avail_grant, cli->cl_dirty, lost_grant);
CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
- " ocd_grant: %d\n", data->ocd_connect_flags,
- data->ocd_version, data->ocd_grant);
- }
+ " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
+ data->ocd_version, data->ocd_grant, lost_grant);
+ }
- RETURN(0);
+ RETURN(0);
}
static int osc_disconnect(struct obd_export *exp)
if (obd->u.cli.cl_conn_count == 1) {
/* Flush any remaining cancel messages out to the
* target */
- llog_sync(ctxt, exp);
+ llog_sync(ctxt, exp, 0);
}
llog_ctxt_put(ctxt);
} else {
if (!IS_ERR(env)) {
/* Reset grants */
cli = &obd->u.cli;
- client_obd_list_lock(&cli->cl_loi_list_lock);
/* all pages go to failing rpcs due to the invalid
* import */
osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
- client_obd_list_unlock(&cli->cl_loi_list_lock);
ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
cl_env_put(env, &refcheck);
CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
- client_obd_list_lock(&cli->cl_loi_list_lock);
osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
- client_obd_list_unlock(&cli->cl_loi_list_lock);
RETURN(0);
}