if (osc_extent_merge(env, ext, next_extent(ext)) == 0)
grant += cli->cl_grant_extent_tax;
+ if (!ext->oe_rw && ext->oe_dlmlock) {
+ bool hp;
+
+ lock_res_and_lock(ext->oe_dlmlock);
+ hp = ldlm_is_cbpending(ext->oe_dlmlock);
+ unlock_res_and_lock(ext->oe_dlmlock);
+
+ /* HP extent should be written ASAP. */
+ if (hp)
+ ext->oe_hp = 1;
+ }
+
if (ext->oe_hp)
list_move_tail(&ext->oe_link,
&obj->oo_hp_exts);
ext->oe_end >= cur->oe_end),
ext, EXTSTR"\n", EXTPARA(cur));
- if (ext->oe_state > OES_CACHE || ext->oe_fsync_wait) {
+ if (ext->oe_state > OES_CACHE || ext->oe_hp ||
+ ext->oe_fsync_wait) {
/* for simplicity, we wait for this extent to
* finish before going forward. */
conflict = osc_extent_get(ext);
}
/* non-overlapped extent */
- if (ext->oe_state != OES_CACHE || ext->oe_fsync_wait)
+ if (ext->oe_state != OES_CACHE || ext->oe_hp ||
+ ext->oe_fsync_wait)
/* we can't do anything for a non OES_CACHE extent, or
* if there is someone waiting for this extent to be
* flushed, try next one. */
return rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight + hprpc;
}
+/* Check whether all I/O RPC slots are used out by parallel DIO. */
+static inline bool osc_full_dio_in_flight(struct client_obd *cli)
+{
+ __u32 rpcs = rpcs_in_flight(cli);
+
+ return rpcs >= cli->cl_max_rpcs_in_flight &&
+ rpcs <= cli->cl_d_in_flight;
+}
+
/* This maintains the lists of pending pages to read/write for a given object
* (lop). This is used by osc_check_rpcs->osc_next_obj() and osc_list_maint()
* to quickly find objects that are ready to send an RPC. */
CDEBUG(D_CACHE, "invalid import forcing RPC\n");
RETURN(1);
}
+ if (!list_empty(&osc->oo_hp_read_exts)) {
+ CDEBUG(D_CACHE, "high prio read request forcing RPC\n");
+ RETURN(1);
+ }
/* all read are urgent. */
if (!list_empty(&osc->oo_reading_exts))
RETURN(1);
OSC_IO_DEBUG(obj, "update pending cmd %d delta %d.\n", cmd, delta);
}
-static int osc_makes_hprpc(struct osc_object *obj)
+static bool osc_makes_hprpc(struct osc_object *obj)
{
- return !list_empty(&obj->oo_hp_exts);
+ return !list_empty(&obj->oo_hp_exts) ||
+ !list_empty(&obj->oo_hp_read_exts);
}
static void on_list(struct list_head *item, struct list_head *list,
RETURN(rc);
}
+static unsigned int get_read_extents(struct osc_object *obj,
+ struct list_head *rpclist)
+{
+ struct client_obd *cli = osc_cli(obj);
+ struct osc_extent *ext;
+ struct osc_extent *next;
+ struct extent_rpc_data data = {
+ .erd_rpc_list = rpclist,
+ .erd_page_count = 0,
+ .erd_max_pages = cli->cl_max_pages_per_rpc,
+ .erd_max_chunks = UINT_MAX,
+ .erd_max_extents = UINT_MAX,
+ };
+
+ assert_osc_object_is_locked(obj);
+ while ((ext = list_first_entry_or_null(&obj->oo_hp_read_exts,
+ struct osc_extent,
+ oe_link)) != NULL) {
+ EASSERT(ext->oe_state == OES_LOCK_DONE, ext);
+ if (!try_to_add_extent_for_io(cli, ext, &data))
+ return data.erd_page_count;
+ osc_extent_state_set(ext, OES_RPC);
+ EASSERT(ext->oe_nr_pages <= data.erd_max_pages, ext);
+ }
+ if (data.erd_page_count == data.erd_max_pages)
+ return data.erd_page_count;
+
+ list_for_each_entry_safe(ext, next, &obj->oo_reading_exts, oe_link) {
+ EASSERT(ext->oe_state == OES_LOCK_DONE, ext);
+ if (!try_to_add_extent_for_io(cli, ext, &data))
+ break;
+ osc_extent_state_set(ext, OES_RPC);
+ EASSERT(ext->oe_nr_pages <= data.erd_max_pages, ext);
+ }
+
+ LASSERT(data.erd_page_count <= data.erd_max_pages);
+ return data.erd_page_count;
+}
+
/**
* prepare pages for ASYNC io and put pages in send queue.
*
struct osc_object *osc)
__must_hold(osc)
{
- struct osc_extent *ext;
- struct osc_extent *next;
LIST_HEAD(rpclist);
- struct extent_rpc_data data = {
- .erd_rpc_list = &rpclist,
- .erd_page_count = 0,
- .erd_max_pages = cli->cl_max_pages_per_rpc,
- .erd_max_chunks = UINT_MAX,
- .erd_max_extents = UINT_MAX,
- };
+ unsigned int page_count;
int rc = 0;
+
ENTRY;
assert_osc_object_is_locked(osc);
- list_for_each_entry_safe(ext, next, &osc->oo_reading_exts, oe_link) {
- EASSERT(ext->oe_state == OES_LOCK_DONE, ext);
- if (!try_to_add_extent_for_io(cli, ext, &data))
- break;
- osc_extent_state_set(ext, OES_RPC);
- EASSERT(ext->oe_nr_pages <= data.erd_max_pages, ext);
- }
- LASSERT(data.erd_page_count <= data.erd_max_pages);
+ page_count = get_read_extents(osc, &rpclist);
- osc_update_pending(osc, OBD_BRW_READ, -data.erd_page_count);
+ osc_update_pending(osc, OBD_BRW_READ, -page_count);
if (!list_empty(&rpclist)) {
osc_object_unlock(osc);
* starvation and leading to server evicting us for not
* writing out pages in a timely manner LU-13131 */
if (osc_max_rpc_in_flight(cli, osc) &&
- list_empty(&osc->oo_hp_exts)) {
+ list_empty(&osc->oo_hp_exts) &&
+ list_empty(&osc->oo_hp_read_exts)) {
__osc_list_maint(cli, osc);
break;
}
bool can_merge = true;
pgoff_t start = CL_PAGE_EOF;
pgoff_t end = 0;
- struct osc_lock *oscl;
+
ENTRY;
list_for_each_entry(oap, list, oap_pending_item) {
anchor = clpage->cp_sync_io;
ext->oe_csd = anchor->csi_dio_aio;
}
- oscl = oio->oi_write_osclock ? : oio->oi_read_osclock;
- if (oscl && oscl->ols_dlmlock != NULL) {
- ext->oe_dlmlock = ldlm_lock_get(oscl->ols_dlmlock);
- }
if (ext->oe_dio && !ext->oe_rw) { /* direct io write */
int grants;
int ppc;
}
osc_update_pending(obj, OBD_BRW_WRITE, page_count);
} else {
- list_add_tail(&ext->oe_link, &obj->oo_reading_exts);
+ bool hp_read = false;
+ struct ldlm_lock *dlmlock;
+ struct osc_lock *oscl;
+
+ /*
+ * The DLM extent lock is under blocking AST, and make
+ * this I/O with high priority.
+ */
+
+ oscl = oio->oi_read_osclock ? : oio->oi_write_osclock;
+ dlmlock = oscl ? oscl->ols_dlmlock : NULL;
+
+ if (dlmlock == NULL && !ext->oe_srvlock) {
+ CDEBUG(D_CACHE,
+ "NOLCK: io %pK "EXTSTR" dio: %d srvlock: %d\n",
+ io, EXTPARA(ext), ext->oe_dio, ext->oe_srvlock);
+ }
+ if (!ext->oe_srvlock && dlmlock != NULL) {
+ lock_res_and_lock(dlmlock);
+ hp_read = ldlm_is_cbpending(dlmlock);
+ unlock_res_and_lock(dlmlock);
+ if (hp_read)
+ CDEBUG(D_CACHE,
+ "HP read: io %pK ext@%pK "EXTSTR"\n",
+ io, ext, EXTPARA(ext));
+ }
+
+ if (hp_read)
+ list_add_tail(&ext->oe_link, &obj->oo_hp_read_exts);
+ else
+ list_add_tail(&ext->oe_link, &obj->oo_reading_exts);
osc_update_pending(obj, OBD_BRW_READ, page_count);
}
+
+ OSC_EXTENT_DUMP(D_CACHE, ext, "allocate ext: rw=%d\n", ext->oe_rw);
osc_object_unlock(obj);
osc_io_unplug_async(env, cli, obj);
result += ext->oe_nr_pages;
if (!discard) {
struct list_head *list = NULL;
- if (hp) {
- EASSERT(!ext->oe_hp, ext);
+
+ if (ext->oe_hp) {
+ /*
+ * The extent is already added into HP
+ * list.
+ * Another thread has already written
+ * back the extent with high priority.
+ */
+ unplug = true;
+ break;
+ } else if (hp) {
ext->oe_hp = 1;
list = &obj->oo_hp_exts;
} else if (!ext->oe_urgent && !ext->oe_hp) {
RETURN(result);
}
+int osc_ldlm_hp_handle(const struct lu_env *env, struct osc_object *obj,
+ pgoff_t start, pgoff_t end, bool read_check_only)
+{
+ struct client_obd *cli = osc_cli(obj);
+ struct osc_extent *ext;
+ struct osc_extent *next;
+ bool no_rpc_slots = false;
+ bool unplug = false;
+
+ ENTRY;
+
+ spin_lock(&cli->cl_loi_list_lock);
+ no_rpc_slots = osc_full_dio_in_flight(cli);
+ spin_unlock(&cli->cl_loi_list_lock);
+
+ /*
+ * Current we only handle with high priority for the case that
+ * all I/O RPC slots are used out by parallel DIO and there are
+ * conflict I/O extents in lock blocking AST.
+ * TODO: Send all I/Os to OSTs on the object corresponding to
+ * the lock in blocking AST. With higher priority, it does not
+ * need to iterate over all OSC objects one by one, the conflict
+ * I/O can be handled more quickly. Thus the lock taken by this
+ * I/O can be release quickly.
+ */
+
+ CDEBUG(D_CACHE,
+ "High prio I/O check: start %lu end %lu RPC(%d):r%u/w%u/d%u\n",
+ start, end, no_rpc_slots, cli->cl_r_in_flight,
+ cli->cl_w_in_flight, cli->cl_d_in_flight);
+ osc_object_lock(obj);
+ /* Check buffered read extents. */
+ list_for_each_entry_safe(ext, next, &obj->oo_reading_exts, oe_link) {
+ EASSERT(ext->oe_state == OES_LOCK_DONE, ext);
+ if (ext->oe_end < start || ext->oe_start > end)
+ continue;
+ if (ext->oe_dio || ext->oe_srvlock)
+ continue;
+
+ list_move_tail(&ext->oe_link, &obj->oo_hp_read_exts);
+ OSC_EXTENT_DUMP(D_CACHE, ext, "HP read this extent\n");
+ unplug = true;
+ }
+
+ if (read_check_only)
+ GOTO(out_unlock, unplug);
+
+ /* Check buffered write extents. */
+ ext = osc_extent_search(obj, start);
+ if (ext == NULL)
+ ext = first_extent(obj);
+ else if (ext->oe_end < start)
+ ext = next_extent(ext);
+ while (ext != NULL) {
+ if (ext->oe_start > end)
+ break;
+
+ ext->oe_fsync_wait = 1;
+ switch (ext->oe_state) {
+ case OES_CACHE:
+ /*
+ * The extent in HP (oe_hp) is being written back by
+ * another thread.
+ */
+ if (ext->oe_hp || ext->oe_dio || ext->oe_srvlock)
+ break;
+
+ ext->oe_hp = 1;
+ list_move_tail(&ext->oe_link, &obj->oo_hp_exts);
+ OSC_EXTENT_DUMP(D_CACHE, ext, "HP write this extent\n");
+ unplug = true;
+ break;
+ case OES_ACTIVE:
+ /*
+ * It is pretty bad to wait for ACTIVE extents, because
+ * we do not know how long we will wait for it to be
+ * flushed since it may be blocked at awaiting more
+ * grants. We do this for the correctness of fsync.
+ */
+ ext->oe_urgent = 1;
+ break;
+ default:
+ break;
+ }
+ ext = next_extent(ext);
+ }
+
+out_unlock:
+ osc_object_unlock(obj);
+
+ if (unplug)
+ osc_io_unplug(env, cli, obj);
+
+ RETURN(0);
+}
/** @} osc */
local mntpt="/mnt/pcc.$tdir"
local hsm_root="$mntpt/$tdir"
local file=$DIR/$tfile
+ local cnt=50
$LCTL get_param -n mdc.*.connect_flags | grep -q pcc_ro ||
skip "Server does not support PCC-RO"
"projid={0}\ roid=$HSM_ARCHIVE_NUMBER\ pccro=1"
do_facet $SINGLEAGT $LCTL pcc list $MOUNT
- do_facet $SINGLEAGT dd if=/dev/zero of=$file bs=1M count=50 ||
+ do_facet $SINGLEAGT dd if=/dev/zero of=$file bs=1M count=$cnt ||
error "Write $file failed"
local rpid
local dpid
local lpcc_path
+ local lckf=$DIR/$tfile.lck
+
+ rm -f $lckf
lpcc_path=$(lpcc_fid2path $hsm_root $file)
(
- while [ ! -e $DIR/sanity-pcc.99.lck ]; do
- do_facet $SINGLEAGT dd if=/dev/zero of=$file bs=1M count=50 conv=notrunc ||
+ while [ ! -e $lckf ]; do
+ do_facet $SINGLEAGT dd if=/dev/zero of=$file bs=1M count=$cnt conv=notrunc || {
+ touch $lckf
error "failed to write $file"
+ }
sleep 0.$((RANDOM % 4 + 1))
done
)&
wpid=$!
(
- while [ ! -e $DIR/sanity-pcc.99.lck ]; do
- do_facet $SINGLEAGT dd if=$file of=/dev/null bs=1M count=50 ||
- error "failed to write $file"
+ while [ ! -e $lckf ]; do
+ echo "Read $file ..."
+ do_facet $SINGLEAGT dd if=$file of=/dev/null bs=1M count=$cnt ||
+ error "failed to read $file"
sleep 0.$((RANDOM % 4 + 1))
done
)&
rpid=$!
(
- while [ ! -e $DIR/sanity-pcc.99.lck ]; do
+ while [ ! -e $lckf ]; do
do_facet $SINGLEAGT $MMAP_CAT $file > /dev/null ||
error "failed to mmap_cat $file"
sleep 0.$((RANDOM % 4 + 1))
rpid2=$!
(
- while [ ! -e $DIR/sanity-pcc.99.lck ]; do
+ while [ ! -e $lckf ]; do
echo "Unlink $lpcc_path"
do_facet $SINGLEAGT unlink $lpcc_path
sleep 1
upid=$!
(
- while [ ! -e $DIR/sanity-pcc.99.lck ]; do
+ while [ ! -e $lckf ]; do
echo "Detach $file ..."
do_facet $SINGLEAGT $LFS pcc detach $file
sleep 0.$((RANDOM % 8 + 1))
dpid=$!
sleep 60
- stack_trap "rm -f $DIR/sanity-pcc.99.lck"
- touch $DIR/sanity-pcc.99.lck
+ echo "==== DONE ===="
+ stack_trap "rm -f $lckf"
+ touch $lckf
wait $wpid || error "$?: write failed"
wait $rpid || error "$?: read failed"
- wait $rpid2 || error "$?: read2 failed"
+ wait $rpid2 || error "$?: mmap read2 failed"
wait $upid || error "$?: unlink failed"
wait $dpid || error "$?: detach failed"
+ echo "==== DONE WIAT ===="
+ lctl get_param osc.*.rpc_stats
do_facet $SINGLEAGT $LFS pcc detach $file
- rm -f $DIR/sanity-pcc.99.lck
+ rm -f $lckf
}
run_test 99 "race among unlink | mmap read | write | detach for PCC-RO file"
+test_99b() {
+ local loopfile="$TMP/$tfile"
+ local mntpt="/mnt/pcc.$tdir"
+ local hsm_root="$mntpt/$tdir"
+ local file=$DIR/$tfile
+ local cnt=50
+
+ setup_loopdev $SINGLEAGT $loopfile $mntpt 200
+ do_facet $SINGLEAGT mkdir $hsm_root || error "mkdir $hsm_root failed"
+ setup_pcc_mapping $SINGLEAGT \
+ "projid={0}\ roid=$HSM_ARCHIVE_NUMBER\ pccro=1"
+ do_facet $SINGLEAGT $LCTL pcc list $MOUNT
+
+ do_facet $SINGLEAGT dd if=/dev/zero of=$file bs=1M count=$cnt ||
+ error "Write $file failed"
+
+ local rpid
+ local rpid2
+ local wpid
+ local upid
+ local dpid
+ local lpcc_path
+
+ local lckf=$DIR/$tfile.lck
+
+ rm -f $lckf
+ lpcc_path=$(lpcc_fid2path $hsm_root $file)
+ (
+ while [ ! -e $lckf ]; do
+ do_facet $SINGLEAGT dd if=/dev/zero of=$file bs=1M count=$cnt conv=notrunc || {
+ touch $lckf
+ error "failed to write $file"
+ }
+ sleep 0.$((RANDOM % 4 + 1))
+ done
+ )&
+ wpid=$!
+
+ (
+ while [ ! -e $lckf ]; do
+ echo "Read $file ..."
+ do_facet $SINGLEAGT dd if=$file of=/dev/null bs=1M count=$cnt ||
+ error "failed to read $file (1)"
+ sleep 0.$((RANDOM % 4 + 1))
+ done
+ )&
+ rpid=$!
+
+ (
+ while [ ! -e $lckf ]; do
+ do_facet $SINGLEAGT dd if=$file of=/dev/null bs=1M count=$cnt ||
+ error "failed to read $file (2)"
+ sleep 0.$((RANDOM % 4 + 1))
+ done
+ )&
+ rpid2=$!
+
+ (
+ while [ ! -e $lckf ]; do
+ echo "Unlink $lpcc_path"
+ do_facet $SINGLEAGT unlink $lpcc_path
+ sleep 1
+ done
+ true
+ )&
+ upid=$!
+
+ (
+ while [ ! -e $lckf ]; do
+ echo "Detach $file ..."
+ do_facet $SINGLEAGT $LFS pcc detach $file
+ sleep 0.$((RANDOM % 8 + 1))
+ done
+ )&
+ dpid=$!
+
+ sleep 60
+ echo "==== DONE ===="
+ touch $lckf
+ wait $wpid || error "$?: write failed"
+ wait $rpid || error "$?: read failed"
+ wait $rpid2 || error "$?: read2 failed"
+ wait $upid || error "$?: unlink failed"
+ wait $dpid || error "$?: detach failed"
+
+ echo "==== DONE WIAT ===="
+ lctl get_param osc.*.rpc_stats
+ do_facet $SINGLEAGT $LFS pcc detach $file
+ rm -f $lckf
+}
+run_test 99b "race among unlink | two readers | write | detach for PCC-RO file"
+
test_100() {
local loopfile="$TMP/$tfile"
local mntpt="/mnt/pcc.$tdir"