-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
* GPL HEADER START
*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
/*
* Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
+ *
+ * Copyright (c) 2011, 2013, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
* Client IO.
*
* Author: Nikita Danilov <nikita.danilov@sun.com>
+ * Author: Jinshan Xiong <jinshan.xiong@intel.com>
*/
#define DEBUG_SUBSYSTEM S_CLASS
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
#include <obd_class.h>
#include <obd_support.h>
#include <lustre_fid.h>
#include <libcfs/list.h>
-/* lu_time_global_{init,fini}() */
-#include <lu_time.h>
-
#include <cl_object.h>
#include "cl_internal.h"
*/
void cl_io_fini(const struct lu_env *env, struct cl_io *io)
{
- struct cl_io_slice *slice;
- struct cl_thread_info *info;
+ struct cl_io_slice *slice;
+ struct cl_thread_info *info;
LINVRNT(cl_io_type_is_valid(io->ci_type));
LINVRNT(cl_io_invariant(io));
ENTRY;
while (!cfs_list_empty(&io->ci_layers)) {
- slice = container_of(io->ci_layers.next, struct cl_io_slice,
+ slice = container_of(io->ci_layers.prev, struct cl_io_slice,
cis_linkage);
cfs_list_del_init(&slice->cis_linkage);
if (slice->cis_iop->op[io->ci_type].cio_fini != NULL)
info = cl_env_info(env);
if (info->clt_current_io == io)
info->clt_current_io = NULL;
- EXIT;
+
+ /* sanity check for layout change */
+ switch(io->ci_type) {
+ case CIT_READ:
+ case CIT_WRITE:
+ break;
+ case CIT_FAULT:
+ case CIT_FSYNC:
+ LASSERT(!io->ci_need_restart);
+ break;
+ case CIT_SETATTR:
+ case CIT_MISC:
+ /* Check ignore layout change conf */
+ LASSERT(ergo(io->ci_ignore_layout || !io->ci_verify_layout,
+ !io->ci_need_restart));
+ break;
+ default:
+ LBUG();
+ }
+ EXIT;
}
EXPORT_SYMBOL(cl_io_fini);
return lu_object_fid(&descr->cld_obj->co_lu);
}
-static int cl_lock_descr_cmp(const struct cl_lock_descr *d0,
- const struct cl_lock_descr *d1)
+static int cl_lock_descr_sort(const struct cl_lock_descr *d0,
+ const struct cl_lock_descr *d1)
{
return lu_fid_cmp(cl_lock_descr_fid(d0), cl_lock_descr_fid(d1)) ?:
__diff_normalize(d0->cld_start, d1->cld_start);
}
+static int cl_lock_descr_cmp(const struct cl_lock_descr *d0,
+ const struct cl_lock_descr *d1)
+{
+ int ret;
+
+ ret = lu_fid_cmp(cl_lock_descr_fid(d0), cl_lock_descr_fid(d1));
+ if (ret)
+ return ret;
+ if (d0->cld_end < d1->cld_start)
+ return -1;
+ if (d0->cld_start > d0->cld_end)
+ return 1;
+ return 0;
+}
+
+static void cl_lock_descr_merge(struct cl_lock_descr *d0,
+ const struct cl_lock_descr *d1)
+{
+ d0->cld_start = min(d0->cld_start, d1->cld_start);
+ d0->cld_end = max(d0->cld_end, d1->cld_end);
+
+ if (d1->cld_mode == CLM_WRITE && d0->cld_mode != CLM_WRITE)
+ d0->cld_mode = CLM_WRITE;
+
+ if (d1->cld_mode == CLM_GROUP && d0->cld_mode != CLM_GROUP)
+ d0->cld_mode = CLM_GROUP;
+}
+
/*
* Sort locks in lexicographical order of their (fid, start-offset) pairs.
*/
&io->ci_lockset.cls_todo,
cill_linkage) {
if (prev != NULL) {
- switch (cl_lock_descr_cmp(&prev->cill_descr,
+ switch (cl_lock_descr_sort(&prev->cill_descr,
&curr->cill_descr)) {
case 0:
/*
if (cl_lock_descr_match(&scan->cill_descr, need))
RETURN(+1);
}
- return 0;
+ RETURN(0);
}
EXPORT_SYMBOL(cl_queue_match);
+static int cl_queue_merge(const cfs_list_t *queue,
+ const struct cl_lock_descr *need)
+{
+ struct cl_io_lock_link *scan;
+
+ ENTRY;
+ cfs_list_for_each_entry(scan, queue, cill_linkage) {
+ if (cl_lock_descr_cmp(&scan->cill_descr, need))
+ continue;
+ cl_lock_descr_merge(&scan->cill_descr, need);
+ CDEBUG(D_VFSTRACE, "lock: %d: [%lu, %lu]\n",
+ scan->cill_descr.cld_mode, scan->cill_descr.cld_start,
+ scan->cill_descr.cld_end);
+ RETURN(+1);
+ }
+ RETURN(0);
+
+}
+
static int cl_lockset_match(const struct cl_lockset *set,
- const struct cl_lock_descr *need, int all_queues)
+ const struct cl_lock_descr *need)
+{
+ return cl_queue_match(&set->cls_curr, need) ||
+ cl_queue_match(&set->cls_done, need);
+}
+
+static int cl_lockset_merge(const struct cl_lockset *set,
+ const struct cl_lock_descr *need)
{
- return (all_queues ? cl_queue_match(&set->cls_todo, need) : 0) ||
- cl_queue_match(&set->cls_curr, need) ||
- cl_queue_match(&set->cls_done, need);
+ return cl_queue_merge(&set->cls_todo, need) ||
+ cl_lockset_match(set, need);
}
static int cl_lockset_lock_one(const struct lu_env *env,
ENTRY;
- lock = cl_lock_request(env, io, &link->cill_descr, "io", io);
+ lock = cl_lock_request(env, io, &link->cill_descr, "io", io);
+
if (!IS_ERR(lock)) {
link->cill_lock = lock;
cfs_list_move(&link->cill_linkage, &set->cls_curr);
ENTRY;
result = 0;
cfs_list_for_each_entry_safe(link, temp, &set->cls_todo, cill_linkage) {
- if (!cl_lockset_match(set, &link->cill_descr, 0)) {
+ if (!cl_lockset_match(set, &link->cill_descr)) {
/* XXX some locking to guarantee that locks aren't
* expanded in between. */
result = cl_lockset_lock_one(env, io, set, link);
int result;
ENTRY;
- if (cl_lockset_match(&io->ci_lockset, &link->cill_descr, 1))
+ if (cl_lockset_merge(&io->ci_lockset, &link->cill_descr))
result = +1;
else {
cfs_list_add(&link->cill_linkage, &io->ci_lockset.cls_todo);
}
}
if (result == 0)
- result = cl_io_submit_rw(env, io, CRT_READ, queue, CRP_NORMAL);
+ result = cl_io_submit_rw(env, io, CRT_READ, queue);
/*
* Unlock unsent pages in case of error.
*/
* \see cl_io_operations::cio_submit()
*/
int cl_io_submit_rw(const struct lu_env *env, struct cl_io *io,
- enum cl_req_type crt, struct cl_2queue *queue,
- enum cl_req_priority priority)
+ enum cl_req_type crt, struct cl_2queue *queue)
{
const struct cl_io_slice *scan;
int result = 0;
if (scan->cis_iop->req_op[crt].cio_submit == NULL)
continue;
result = scan->cis_iop->req_op[crt].cio_submit(env, scan, crt,
- queue, priority);
+ queue);
if (result != 0)
break;
}
*/
int cl_io_submit_sync(const struct lu_env *env, struct cl_io *io,
enum cl_req_type iot, struct cl_2queue *queue,
- enum cl_req_priority prio, long timeout)
+ long timeout)
{
struct cl_sync_io *anchor = &cl_env_info(env)->clt_anchor;
struct cl_page *pg;
int rc;
- LASSERT(prio == CRP_NORMAL || prio == CRP_CANCEL);
-
cl_page_list_for_each(pg, &queue->c2_qin) {
LASSERT(pg->cp_sync_io == NULL);
pg->cp_sync_io = anchor;
}
cl_sync_io_init(anchor, queue->c2_qin.pl_nr);
- rc = cl_io_submit_rw(env, io, iot, queue, prio);
+ rc = cl_io_submit_rw(env, io, iot, queue);
if (rc == 0) {
/*
* If some pages weren't sent for any reason (e.g.,
}
cl_io_iter_fini(env, io);
} while (result == 0 && io->ci_continue);
- RETURN(result < 0 ? result : 0);
+ if (result == 0)
+ result = io->ci_result;
+ RETURN(result < 0 ? result : 0);
}
EXPORT_SYMBOL(cl_io_loop);
*/
void cl_page_list_init(struct cl_page_list *plist)
{
- ENTRY;
- plist->pl_nr = 0;
- CFS_INIT_LIST_HEAD(&plist->pl_pages);
- plist->pl_owner = cfs_current();
- EXIT;
+ ENTRY;
+ plist->pl_nr = 0;
+ CFS_INIT_LIST_HEAD(&plist->pl_pages);
+ plist->pl_owner = current;
+ EXIT;
}
EXPORT_SYMBOL(cl_page_list_init);
*/
void cl_page_list_add(struct cl_page_list *plist, struct cl_page *page)
{
- ENTRY;
- /* it would be better to check that page is owned by "current" io, but
- * it is not passed here. */
- LASSERT(page->cp_owner != NULL);
- LINVRNT(plist->pl_owner == cfs_current());
-
- cfs_lockdep_off();
- cfs_mutex_lock(&page->cp_mutex);
- cfs_lockdep_on();
- LASSERT(cfs_list_empty(&page->cp_batch));
- cfs_list_add_tail(&page->cp_batch, &plist->pl_pages);
- ++plist->pl_nr;
- page->cp_queue_ref = lu_ref_add(&page->cp_reference, "queue", plist);
- cl_page_get(page);
- EXIT;
+ ENTRY;
+ /* it would be better to check that page is owned by "current" io, but
+ * it is not passed here. */
+ LASSERT(page->cp_owner != NULL);
+ LINVRNT(plist->pl_owner == current);
+
+ lockdep_off();
+ mutex_lock(&page->cp_mutex);
+ lockdep_on();
+ LASSERT(cfs_list_empty(&page->cp_batch));
+ cfs_list_add_tail(&page->cp_batch, &plist->pl_pages);
+ ++plist->pl_nr;
+ lu_ref_add_at(&page->cp_reference, &page->cp_queue_ref, "queue", plist);
+ cl_page_get(page);
+ EXIT;
}
EXPORT_SYMBOL(cl_page_list_add);
* Removes a page from a page list.
*/
void cl_page_list_del(const struct lu_env *env,
- struct cl_page_list *plist, struct cl_page *page)
+ struct cl_page_list *plist, struct cl_page *page)
{
- LASSERT(plist->pl_nr > 0);
- LINVRNT(plist->pl_owner == cfs_current());
+ LASSERT(plist->pl_nr > 0);
+ LINVRNT(plist->pl_owner == current);
- ENTRY;
- cfs_list_del_init(&page->cp_batch);
- cfs_lockdep_off();
- cfs_mutex_unlock(&page->cp_mutex);
- cfs_lockdep_on();
- --plist->pl_nr;
- lu_ref_del_at(&page->cp_reference, page->cp_queue_ref, "queue", plist);
- cl_page_put(env, page);
- EXIT;
+ ENTRY;
+ cfs_list_del_init(&page->cp_batch);
+ lockdep_off();
+ mutex_unlock(&page->cp_mutex);
+ lockdep_on();
+ --plist->pl_nr;
+ lu_ref_del_at(&page->cp_reference, &page->cp_queue_ref, "queue", plist);
+ cl_page_put(env, page);
+ EXIT;
}
EXPORT_SYMBOL(cl_page_list_del);
* Moves a page from one page list to another.
*/
void cl_page_list_move(struct cl_page_list *dst, struct cl_page_list *src,
- struct cl_page *page)
+ struct cl_page *page)
{
- LASSERT(src->pl_nr > 0);
- LINVRNT(dst->pl_owner == cfs_current());
- LINVRNT(src->pl_owner == cfs_current());
+ LASSERT(src->pl_nr > 0);
+ LINVRNT(dst->pl_owner == current);
+ LINVRNT(src->pl_owner == current);
- ENTRY;
- cfs_list_move_tail(&page->cp_batch, &dst->pl_pages);
- --src->pl_nr;
- ++dst->pl_nr;
- lu_ref_set_at(&page->cp_reference,
- page->cp_queue_ref, "queue", src, dst);
- EXIT;
+ ENTRY;
+ cfs_list_move_tail(&page->cp_batch, &dst->pl_pages);
+ --src->pl_nr;
+ ++dst->pl_nr;
+ lu_ref_set_at(&page->cp_reference, &page->cp_queue_ref, "queue",
+ src, dst);
+ EXIT;
}
EXPORT_SYMBOL(cl_page_list_move);
*/
void cl_page_list_splice(struct cl_page_list *list, struct cl_page_list *head)
{
- struct cl_page *page;
- struct cl_page *tmp;
+ struct cl_page *page;
+ struct cl_page *tmp;
- LINVRNT(list->pl_owner == cfs_current());
- LINVRNT(head->pl_owner == cfs_current());
+ LINVRNT(list->pl_owner == current);
+ LINVRNT(head->pl_owner == current);
- ENTRY;
- cl_page_list_for_each_safe(page, tmp, list)
- cl_page_list_move(head, list, page);
- EXIT;
+ ENTRY;
+ cl_page_list_for_each_safe(page, tmp, list)
+ cl_page_list_move(head, list, page);
+ EXIT;
}
EXPORT_SYMBOL(cl_page_list_splice);
* Disowns pages in a queue.
*/
void cl_page_list_disown(const struct lu_env *env,
- struct cl_io *io, struct cl_page_list *plist)
-{
- struct cl_page *page;
- struct cl_page *temp;
-
- LINVRNT(plist->pl_owner == cfs_current());
-
- ENTRY;
- cl_page_list_for_each_safe(page, temp, plist) {
- LASSERT(plist->pl_nr > 0);
-
- cfs_list_del_init(&page->cp_batch);
- cfs_lockdep_off();
- cfs_mutex_unlock(&page->cp_mutex);
- cfs_lockdep_on();
- --plist->pl_nr;
- /*
- * cl_page_disown0 rather than usual cl_page_disown() is used,
- * because pages are possibly in CPS_FREEING state already due
- * to the call to cl_page_list_discard().
- */
- /*
- * XXX cl_page_disown0() will fail if page is not locked.
- */
- cl_page_disown0(env, io, page);
- lu_ref_del(&page->cp_reference, "queue", plist);
- cl_page_put(env, page);
- }
- EXIT;
+ struct cl_io *io, struct cl_page_list *plist)
+{
+ struct cl_page *page;
+ struct cl_page *temp;
+
+ LINVRNT(plist->pl_owner == current);
+
+ ENTRY;
+ cl_page_list_for_each_safe(page, temp, plist) {
+ LASSERT(plist->pl_nr > 0);
+
+ cfs_list_del_init(&page->cp_batch);
+ lockdep_off();
+ mutex_unlock(&page->cp_mutex);
+ lockdep_on();
+ --plist->pl_nr;
+ /*
+ * cl_page_disown0 rather than usual cl_page_disown() is used,
+ * because pages are possibly in CPS_FREEING state already due
+ * to the call to cl_page_list_discard().
+ */
+ /*
+ * XXX cl_page_disown0() will fail if page is not locked.
+ */
+ cl_page_disown0(env, io, page);
+ lu_ref_del_at(&page->cp_reference, &page->cp_queue_ref, "queue",
+ plist);
+ cl_page_put(env, page);
+ }
+ EXIT;
}
EXPORT_SYMBOL(cl_page_list_disown);
*/
void cl_page_list_fini(const struct lu_env *env, struct cl_page_list *plist)
{
- struct cl_page *page;
- struct cl_page *temp;
+ struct cl_page *page;
+ struct cl_page *temp;
- LINVRNT(plist->pl_owner == cfs_current());
+ LINVRNT(plist->pl_owner == current);
- ENTRY;
- cl_page_list_for_each_safe(page, temp, plist)
- cl_page_list_del(env, plist, page);
- LASSERT(plist->pl_nr == 0);
- EXIT;
+ ENTRY;
+ cl_page_list_for_each_safe(page, temp, plist)
+ cl_page_list_del(env, plist, page);
+ LASSERT(plist->pl_nr == 0);
+ EXIT;
}
EXPORT_SYMBOL(cl_page_list_fini);
* Owns all pages in a queue.
*/
int cl_page_list_own(const struct lu_env *env,
- struct cl_io *io, struct cl_page_list *plist)
-{
- struct cl_page *page;
- struct cl_page *temp;
- pgoff_t index = 0;
- int result;
-
- LINVRNT(plist->pl_owner == cfs_current());
-
- ENTRY;
- result = 0;
- cl_page_list_for_each_safe(page, temp, plist) {
- LASSERT(index <= page->cp_index);
- index = page->cp_index;
- if (cl_page_own(env, io, page) == 0)
- result = result ?: page->cp_error;
- else
- cl_page_list_del(env, plist, page);
- }
- RETURN(result);
+ struct cl_io *io, struct cl_page_list *plist)
+{
+ struct cl_page *page;
+ struct cl_page *temp;
+ pgoff_t index = 0;
+ int result;
+
+ LINVRNT(plist->pl_owner == current);
+
+ ENTRY;
+ result = 0;
+ cl_page_list_for_each_safe(page, temp, plist) {
+ LASSERT(index <= page->cp_index);
+ index = page->cp_index;
+ if (cl_page_own(env, io, page) == 0)
+ result = result ?: page->cp_error;
+ else
+ cl_page_list_del(env, plist, page);
+ }
+ RETURN(result);
}
EXPORT_SYMBOL(cl_page_list_own);
* Assumes all pages in a queue.
*/
void cl_page_list_assume(const struct lu_env *env,
- struct cl_io *io, struct cl_page_list *plist)
+ struct cl_io *io, struct cl_page_list *plist)
{
- struct cl_page *page;
+ struct cl_page *page;
- LINVRNT(plist->pl_owner == cfs_current());
+ LINVRNT(plist->pl_owner == current);
- cl_page_list_for_each(page, plist)
- cl_page_assume(env, io, page);
+ cl_page_list_for_each(page, plist)
+ cl_page_assume(env, io, page);
}
EXPORT_SYMBOL(cl_page_list_assume);
* Discards all pages in a queue.
*/
void cl_page_list_discard(const struct lu_env *env, struct cl_io *io,
- struct cl_page_list *plist)
+ struct cl_page_list *plist)
{
- struct cl_page *page;
+ struct cl_page *page;
- LINVRNT(plist->pl_owner == cfs_current());
- ENTRY;
- cl_page_list_for_each(page, plist)
- cl_page_discard(env, io, page);
- EXIT;
+ LINVRNT(plist->pl_owner == current);
+ ENTRY;
+ cl_page_list_for_each(page, plist)
+ cl_page_discard(env, io, page);
+ EXIT;
}
EXPORT_SYMBOL(cl_page_list_discard);
/**
- * Unmaps all pages in a queue from user virtual memory.
- */
-int cl_page_list_unmap(const struct lu_env *env, struct cl_io *io,
- struct cl_page_list *plist)
-{
- struct cl_page *page;
- int result;
-
- LINVRNT(plist->pl_owner == cfs_current());
- ENTRY;
- result = 0;
- cl_page_list_for_each(page, plist) {
- result = cl_page_unmap(env, io, page);
- if (result != 0)
- break;
- }
- RETURN(result);
-}
-EXPORT_SYMBOL(cl_page_list_unmap);
-
-/**
* Initialize dual page queue.
*/
void cl_2queue_init(struct cl_2queue *queue)
if (req->crq_o != NULL) {
for (i = 0; i < req->crq_nrobjs; ++i) {
struct cl_object *obj = req->crq_o[i].ro_obj;
- if (obj != NULL) {
- lu_object_ref_del_at(&obj->co_lu,
- req->crq_o[i].ro_obj_ref,
- "cl_req", req);
- cl_object_put(env, obj);
- }
+ if (obj != NULL) {
+ lu_object_ref_del_at(&obj->co_lu,
+ &req->crq_o[i].ro_obj_ref,
+ "cl_req", req);
+ cl_object_put(env, obj);
+ }
}
OBD_FREE(req->crq_o, req->crq_nrobjs * sizeof req->crq_o[0]);
}
ENTRY;
page = cl_page_top(page);
- LINVRNT(cl_page_is_vmlocked(env, page));
LASSERT(cfs_list_empty(&page->cp_flight));
LASSERT(page->cp_req == NULL);
+ CL_PAGE_DEBUG(D_PAGE, env, page, "req %p, %d, %u\n",
+ req, req->crq_type, req->crq_nrpages);
+
cfs_list_add_tail(&page->cp_flight, &req->crq_pages);
++req->crq_nrpages;
page->cp_req = req;
if (rqo->ro_obj == NULL) {
rqo->ro_obj = obj;
cl_object_get(obj);
- rqo->ro_obj_ref = lu_object_ref_add(&obj->co_lu,
- "cl_req", req);
+ lu_object_ref_add_at(&obj->co_lu, &rqo->ro_obj_ref,
+ "cl_req", req);
break;
}
}
ENTRY;
page = cl_page_top(page);
- LINVRNT(cl_page_is_vmlocked(env, page));
LASSERT(!cfs_list_empty(&page->cp_flight));
LASSERT(req->crq_nrpages > 0);
*/
void cl_sync_io_init(struct cl_sync_io *anchor, int nrpages)
{
- ENTRY;
- cfs_waitq_init(&anchor->csi_waitq);
- cfs_atomic_set(&anchor->csi_sync_nr, nrpages);
- anchor->csi_sync_rc = 0;
- EXIT;
+ ENTRY;
+ init_waitqueue_head(&anchor->csi_waitq);
+ cfs_atomic_set(&anchor->csi_sync_nr, nrpages);
+ cfs_atomic_set(&anchor->csi_barrier, nrpages > 0);
+ anchor->csi_sync_rc = 0;
+ EXIT;
}
EXPORT_SYMBOL(cl_sync_io_init);
}
LASSERT(cfs_atomic_read(&anchor->csi_sync_nr) == 0);
cl_page_list_assume(env, io, queue);
- POISON(anchor, 0x5a, sizeof *anchor);
- RETURN(rc);
+
+ /* wait until cl_sync_io_note() has done wakeup */
+ while (unlikely(cfs_atomic_read(&anchor->csi_barrier) != 0)) {
+#ifdef __KERNEL__
+ cpu_relax();
+#endif
+ }
+
+ POISON(anchor, 0x5a, sizeof *anchor);
+ RETURN(rc);
}
EXPORT_SYMBOL(cl_sync_io_wait);
* IO.
*/
LASSERT(cfs_atomic_read(&anchor->csi_sync_nr) > 0);
- if (cfs_atomic_dec_and_test(&anchor->csi_sync_nr))
- cfs_waitq_broadcast(&anchor->csi_waitq);
- EXIT;
+ if (cfs_atomic_dec_and_test(&anchor->csi_sync_nr)) {
+ wake_up_all(&anchor->csi_waitq);
+ /* it's safe to nuke or reuse anchor now */
+ cfs_atomic_set(&anchor->csi_barrier, 0);
+ }
+ EXIT;
}
EXPORT_SYMBOL(cl_sync_io_note);