return cli->cl_r_in_flight + cli->cl_w_in_flight;
}
+int osc_wake_sync_fs(struct client_obd *cli)
+{
+ ENTRY;
+ if (cfs_list_empty(&cli->cl_loi_sync_fs_list) &&
+ cli->cl_sf_wait.started) {
+ cli->cl_sf_wait.sfw_upcall(cli->cl_sf_wait.sfw_oi, 0);
+ cli->cl_sf_wait.started = 0;
+ }
+ RETURN(0);
+}
+
/* caller must hold loi_list_lock */
void osc_wake_cache_waiters(struct client_obd *cli)
{
oa->o_grant = cli->cl_avail_grant / 4;
cli->cl_avail_grant -= oa->o_grant;
client_obd_list_unlock(&cli->cl_loi_list_lock);
+ if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
+ oa->o_valid |= OBD_MD_FLFLAGS;
+ oa->o_flags = 0;
+ }
oa->o_flags |= OBD_FL_SHRINK_GRANT;
osc_update_next_shrink(cli);
}
body->oa.o_grant = cli->cl_avail_grant - target;
cli->cl_avail_grant = target;
client_obd_list_unlock(&cli->cl_loi_list_lock);
+ if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
+ body->oa.o_valid |= OBD_MD_FLFLAGS;
+ body->oa.o_flags = 0;
+ }
body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
osc_update_next_shrink(cli);
struct lov_stripe_md *lsm, obd_count page_count,
struct brw_page **pga,
struct ptlrpc_request **reqp,
- struct obd_capa *ocapa, int reserve)
+ struct obd_capa *ocapa, int reserve,
+ int resend)
{
struct ptlrpc_request *req;
struct ptlrpc_bulk_desc *desc;
&RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
+ if (resend) {
+ if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
+ body->oa.o_valid |= OBD_MD_FLFLAGS;
+ body->oa.o_flags = 0;
+ }
+ body->oa.o_flags |= OBD_FL_RECOV_RESEND;
+ }
+
if (osc_should_shrink_grant(cli))
osc_shrink_grant_local(cli, &body->oa);
restart_bulk:
rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
- page_count, pga, &req, ocapa, 0);
+ page_count, pga, &req, ocapa, 0, resends);
if (rc != 0)
return (rc);
aa->aa_cli, aa->aa_oa,
NULL /* lsm unused by osc currently */,
aa->aa_page_count, aa->aa_ppga,
- &new_req, aa->aa_ocapa, 0);
+ &new_req, aa->aa_ocapa, 0, 1);
if (rc)
RETURN(rc);
osc_release_write_grant(cli, &oap->oap_brw_page, sent);
}
+static int lop_makes_syncfs_rpc(struct loi_oap_pages *lop)
+{
+ struct osc_async_page *oap;
+ ENTRY;
+
+ if (cfs_list_empty(&lop->lop_urgent))
+ RETURN(0);
+
+ oap = cfs_list_entry(lop->lop_urgent.next,
+ struct osc_async_page, oap_urgent_item);
+
+ if (oap->oap_async_flags & ASYNC_SYNCFS) {
+ CDEBUG(D_CACHE, "syncfs request forcing RPC\n");
+ RETURN(1);
+ }
+
+ RETURN(0);
+}
/* This maintains the lists of pending pages to read/write for a given object
* (lop). This is used by osc_check_rpcs->osc_next_loi() and loi_list_maint()
on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list, 0);
on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 1);
} else {
- on_list(&loi->loi_hp_ready_item, &cli->cl_loi_hp_ready_list, 0);
- on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
- lop_makes_rpc(cli, &loi->loi_write_lop, OBD_BRW_WRITE)||
- lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ));
+ if (lop_makes_syncfs_rpc(&loi->loi_write_lop)) {
+ on_list(&loi->loi_sync_fs_item,
+ &cli->cl_loi_sync_fs_list,
+ loi->loi_write_lop.lop_num_pending);
+ } else {
+ on_list(&loi->loi_hp_ready_item,
+ &cli->cl_loi_hp_ready_list, 0);
+ on_list(&loi->loi_ready_item, &cli->cl_loi_ready_list,
+ lop_makes_rpc(cli, &loi->loi_write_lop,
+ OBD_BRW_WRITE)||
+ lop_makes_rpc(cli, &loi->loi_read_lop,
+ OBD_BRW_READ));
+ }
}
on_list(&loi->loi_write_item, &cli->cl_loi_write_list,
ar->ar_force_sync = 0;
}
+static int osc_add_to_lop_urgent(struct loi_oap_pages *lop,
+ struct osc_async_page *oap,
+ obd_flag async_flags)
+{
+
+ /* If true, then already present in lop urgent */
+ if (!cfs_list_empty(&oap->oap_urgent_item)) {
+ CWARN("Request to add duplicate oap_urgent for flag = %d\n",
+ oap->oap_async_flags);
+ return 1;
+ }
+
+ /* item from sync_fs, to avoid duplicates check the existing flags */
+ if (async_flags & ASYNC_SYNCFS) {
+ cfs_list_add_tail(&oap->oap_urgent_item,
+ &lop->lop_urgent);
+ return 0;
+ }
+
+ if (oap->oap_async_flags & ASYNC_HP)
+ cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
+ else if (oap->oap_async_flags & ASYNC_URGENT ||
+ async_flags & ASYNC_URGENT)
+ cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
+
+ return 0;
+}
+
void osc_oap_to_pending(struct osc_async_page *oap)
{
struct loi_oap_pages *lop;
else
lop = &oap->oap_loi->loi_read_lop;
- if (oap->oap_async_flags & ASYNC_HP)
- cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
- else if (oap->oap_async_flags & ASYNC_URGENT)
- cfs_list_add_tail(&oap->oap_urgent_item, &lop->lop_urgent);
+ osc_add_to_lop_urgent(lop, oap, 0);
cfs_list_add_tail(&oap->oap_pending_item, &lop->lop_pending);
lop_update_pending(oap->oap_cli, lop, oap->oap_cmd, 1);
}
osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1);
}
osc_wake_cache_waiters(cli);
+ osc_wake_sync_fs(cli);
osc_check_rpcs(env, cli);
client_obd_list_unlock(&cli->cl_loi_list_lock);
if (!async)
sort_brw_pages(pga, page_count);
rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
- pga, &req, crattr.cra_capa, 1);
+ pga, &req, crattr.cra_capa, 1, 0);
if (rc != 0) {
CERROR("prep_req failed: %d\n", rc);
GOTO(out, req = ERR_PTR(rc));
}
osc_wake_cache_waiters(cli);
-
+ osc_wake_sync_fs(cli);
loi_list_maint(cli, loi);
client_obd_list_unlock(&cli->cl_loi_list_lock);
if (!cfs_list_empty(&cli->cl_loi_ready_list))
RETURN(cfs_list_entry(cli->cl_loi_ready_list.next,
struct lov_oinfo, loi_ready_item));
+ if (!cfs_list_empty(&cli->cl_loi_sync_fs_list))
+ RETURN(cfs_list_entry(cli->cl_loi_sync_fs_list.next,
+ struct lov_oinfo, loi_sync_fs_item));
/* then if we have cache waiters, return all objects with queued
* writes. This is especially important when many small files
cfs_list_del_init(&loi->loi_write_item);
if (!cfs_list_empty(&loi->loi_read_item))
cfs_list_del_init(&loi->loi_read_item);
+ if (!cfs_list_empty(&loi->loi_sync_fs_item))
+ cfs_list_del_init(&loi->loi_sync_fs_item);
loi_list_maint(cli, loi);
if ((oap->oap_async_flags & async_flags) == async_flags)
RETURN(0);
+ /* XXX: This introduces a tiny insignificant race for the case if this
+ * loi already had other urgent items.
+ */
+ if (SETTING(oap->oap_async_flags, async_flags, ASYNC_SYNCFS) &&
+ cfs_list_empty(&oap->oap_rpc_item) &&
+ cfs_list_empty(&oap->oap_urgent_item)) {
+ osc_add_to_lop_urgent(lop, oap, ASYNC_SYNCFS);
+ flags |= ASYNC_SYNCFS;
+ cfs_spin_lock(&oap->oap_lock);
+ oap->oap_async_flags |= flags;
+ cfs_spin_unlock(&oap->oap_lock);
+ loi_list_maint(cli, loi);
+ RETURN(0);
+ }
+
if (SETTING(oap->oap_async_flags, async_flags, ASYNC_READY))
flags |= ASYNC_READY;
if (SETTING(oap->oap_async_flags, async_flags, ASYNC_URGENT) &&
cfs_list_empty(&oap->oap_rpc_item)) {
- if (oap->oap_async_flags & ASYNC_HP)
- cfs_list_add(&oap->oap_urgent_item, &lop->lop_urgent);
- else
- cfs_list_add_tail(&oap->oap_urgent_item,
- &lop->lop_urgent);
+ osc_add_to_lop_urgent(lop, oap, ASYNC_URGENT);
flags |= ASYNC_URGENT;
loi_list_maint(cli, loi);
}
if (!cfs_list_empty(&oap->oap_urgent_item)) {
cfs_list_del_init(&oap->oap_urgent_item);
cfs_spin_lock(&oap->oap_lock);
- oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP);
+ oap->oap_async_flags &= ~(ASYNC_URGENT | ASYNC_HP |
+ ASYNC_SYNCFS);
cfs_spin_unlock(&oap->oap_lock);
}
if (!cfs_list_empty(&oap->oap_pending_item)) {
RETURN(rc);
}
+/**
+ * Determine whether the lock can be canceled before replaying the lock
+ * during recovery, see bug16774 for detailed information.
+ *
+ * \retval zero the lock can't be canceled
+ * \retval other ok to cancel
+ */
+static int osc_cancel_for_recovery(struct ldlm_lock *lock)
+{
+ check_res_locked(lock->l_resource);
+
+ /*
+ * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
+ *
+ * XXX as a future improvement, we can also cancel unused write lock
+ * if it doesn't have dirty data and active mmaps.
+ */
+ if (lock->l_resource->lr_type == LDLM_EXTENT &&
+ (lock->l_granted_mode == LCK_PR ||
+ lock->l_granted_mode == LCK_CR) &&
+ (osc_dlm_lock_pageref(lock) == 0))
+ RETURN(1);
+
+ RETURN(0);
+}
+
int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
{
int rc;
CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
cfs_sema_init(&cli->cl_grant_sem, 1);
+
+ ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
}
RETURN(rc);
return(rc);
}
+static int osc_sync_fs(struct obd_export *exp, struct obd_info *oinfo,
+ int wait)
+{
+ struct obd_device *obd = class_exp2obd(exp);
+ struct client_obd *cli;
+ struct lov_oinfo *loi;
+ struct lov_oinfo *tloi;
+ struct osc_async_page *oap;
+ struct osc_async_page *toap;
+ struct loi_oap_pages *lop;
+ struct lu_env *env;
+ int refcheck;
+ int rc = 0;
+ ENTRY;
+
+ env = cl_env_get(&refcheck);
+ if (IS_ERR(env))
+ RETURN(PTR_ERR(env));
+
+ cli = &obd->u.cli;
+ client_obd_list_lock(&cli->cl_loi_list_lock);
+ cli->cl_sf_wait.sfw_oi = oinfo;
+ cli->cl_sf_wait.sfw_upcall = oinfo->oi_cb_up;
+ cli->cl_sf_wait.started = 1;
+ /* creating cl_loi_sync_fs list */
+ cfs_list_for_each_entry_safe(loi, tloi, &cli->cl_loi_write_list,
+ loi_write_item) {
+ lop = &loi->loi_write_lop;
+ cfs_list_for_each_entry_safe(oap, toap, &lop->lop_pending,
+ oap_pending_item)
+ osc_set_async_flags_base(cli, loi, oap, ASYNC_SYNCFS);
+ }
+ osc_check_rpcs(env, cli);
+ osc_wake_sync_fs(cli);
+ client_obd_list_unlock(&cli->cl_loi_list_lock);
+ cl_env_put(env, &refcheck);
+
+ RETURN(rc);
+}
+
static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
{
return osc_process_config_base(obd, buf);
.o_llog_init = osc_llog_init,
.o_llog_finish = osc_llog_finish,
.o_process_config = osc_process_config,
+ .o_sync_fs = osc_sync_fs,
};
extern struct lu_kmem_descr osc_caches[];