#include <linux/version.h>
#include <linux/module.h>
#include <linux/fs.h>
+#include <linux/completion.h>
#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
#include <linux/iobuf.h>
#endif
#include <asm/div64.h>
+#include <linux/smp_lock.h>
#else
#include <liblustre.h>
#endif
#include <linux/obd_echo.h>
#include <linux/lustre_debug.h>
#include <linux/lprocfs_status.h>
-#include <linux/lustre_lite.h> /* for LL_IOC_LOV_SETSTRIPE */
+
+static obd_id last_object_id;
#if 0
static void
struct lov_stripe_md *lsm = eco->eco_lsm;
int i;
- printk (KERN_INFO "%s: object %p: "LPX64", refs %d%s: "LPX64
- "=%u!%u@%d\n", msg, eco, eco->eco_id, eco->eco_refcount,
+ printk (KERN_INFO "Lustre: %s: object %p: "LPX64", refs %d%s: "LPX64
+ "=%u!%u\n", msg, eco, eco->eco_id, eco->eco_refcount,
eco->eco_deleted ? "(deleted) " : "",
lsm->lsm_object_id, lsm->lsm_stripe_size,
- lsm->lsm_stripe_count, lsm->lsm_stripe_offset);
+ lsm->lsm_stripe_count);
for (i = 0; i < lsm->lsm_stripe_count; i++)
- printk (KERN_INFO " [%2u]"LPX64"\n",
+ printk (KERN_INFO "Lustre: @%2u:"LPX64"\n",
lsm->lsm_oinfo[i].loi_ost_idx,
lsm->lsm_oinfo[i].loi_id);
}
static struct ec_object *
echo_find_object_locked (struct obd_device *obd, obd_id id)
{
- struct echo_client_obd *ec = &obd->u.echo_client;
+ struct echo_client_obd *ec = &obd->u.echocli;
struct ec_object *eco = NULL;
struct list_head *el;
echo_copyin_lsm (struct obd_device *obd, struct lov_stripe_md *lsm,
void *ulsm, int ulsm_nob)
{
- struct echo_client_obd *ec = &obd->u.echo_client;
+ struct echo_client_obd *ec = &obd->u.echocli;
int nob;
if (ulsm_nob < sizeof (*lsm))
if (ulsm_nob < nob ||
lsm->lsm_stripe_count > ec->ec_nstripes ||
lsm->lsm_magic != LOV_MAGIC ||
- (lsm->lsm_stripe_offset != 0 &&
- lsm->lsm_stripe_offset != 0xffffffff &&
- lsm->lsm_stripe_offset >= ec->ec_nstripes) ||
(lsm->lsm_stripe_size & (PAGE_SIZE - 1)) != 0 ||
((__u64)lsm->lsm_stripe_size * lsm->lsm_stripe_count > ~0UL))
return (-EINVAL);
- LASSERT (ec->ec_lsmsize >= sizeof (*lsm) + nob);
-
if (copy_from_user(lsm->lsm_oinfo,
((struct lov_stripe_md *)ulsm)->lsm_oinfo, nob))
return (-EFAULT);
static struct ec_object *
echo_allocate_object (struct obd_device *obd)
{
- struct echo_client_obd *ec = &obd->u.echo_client;
+ struct echo_client_obd *ec = &obd->u.echocli;
struct ec_object *eco;
+ int rc;
- OBD_ALLOC (eco, sizeof (*eco));
+ OBD_ALLOC(eco, sizeof (*eco));
if (eco == NULL)
- return (NULL);
+ return NULL;
- OBD_ALLOC (eco->eco_lsm, ec->ec_lsmsize);
- if (eco->eco_lsm == NULL) {
- OBD_FREE (eco, sizeof (*eco));
- return (NULL);
+ rc = obd_alloc_memmd(ec->ec_exp, &eco->eco_lsm);
+ if (rc < 0) {
+ OBD_FREE(eco, sizeof (*eco));
+ return NULL;
}
eco->eco_device = obd;
echo_free_object (struct ec_object *eco)
{
struct obd_device *obd = eco->eco_device;
- struct echo_client_obd *ec = &obd->u.echo_client;
+ struct echo_client_obd *ec = &obd->u.echocli;
LASSERT (eco->eco_refcount == 0);
- OBD_FREE (eco->eco_lsm, ec->ec_lsmsize);
+ obd_free_memmd(ec->ec_exp, &eco->eco_lsm);
OBD_FREE (eco, sizeof (*eco));
}
-static int
-echo_create_object (struct obd_device *obd, int on_target, struct obdo *oa,
- void *ulsm, int ulsm_nob)
+static int echo_create_object(struct obd_device *obd, int on_target,
+ struct obdo *oa, void *ulsm, int ulsm_nob,
+ struct obd_trans_info *oti)
{
- struct echo_client_obd *ec = &obd->u.echo_client;
+ struct echo_client_obd *ec = &obd->u.echocli;
struct ec_object *eco2;
struct ec_object *eco;
struct lov_stripe_md *lsm;
int rc;
- int i;
+ int i, idx;
if ((oa->o_valid & OBD_MD_FLID) == 0 && /* no obj id */
(on_target || /* set_stripe */
return (-EINVAL);
}
- eco = echo_allocate_object (obd);
- if (eco == NULL)
- return (-ENOMEM);
+ if (ulsm != NULL) {
+ eco = echo_allocate_object (obd);
+ if (eco == NULL)
+ return (-ENOMEM);
- lsm = eco->eco_lsm;
+ lsm = eco->eco_lsm;
- if (ulsm != NULL) {
rc = echo_copyin_lsm (obd, lsm, ulsm, ulsm_nob);
if (rc != 0)
goto failed;
- }
- /* setup object ID here for !on_target and LOV hint */
- if ((oa->o_valid & OBD_MD_FLID) != 0)
- eco->eco_id = lsm->lsm_object_id = oa->o_id;
+ /* setup object ID here for !on_target and LOV hint */
+ if ((oa->o_valid & OBD_MD_FLID) != 0)
+ eco->eco_id = lsm->lsm_object_id = oa->o_id;
- /* defaults -> actual values */
- if (lsm->lsm_stripe_offset == 0xffffffff)
- lsm->lsm_stripe_offset = 0;
+ if (lsm->lsm_stripe_count == 0)
+ lsm->lsm_stripe_count = ec->ec_nstripes;
- if (lsm->lsm_stripe_count == 0)
- lsm->lsm_stripe_count = ec->ec_nstripes;
+ if (lsm->lsm_stripe_size == 0)
+ lsm->lsm_stripe_size = PAGE_SIZE;
- if (lsm->lsm_stripe_size == 0)
- lsm->lsm_stripe_size = PAGE_SIZE;
+ idx = ll_insecure_random_int();
- /* setup stripes: indices + default ids if required */
- for (i = 0; i < lsm->lsm_stripe_count; i++) {
- if (lsm->lsm_oinfo[i].loi_id == 0)
- lsm->lsm_oinfo[i].loi_id = lsm->lsm_object_id;
+ /* setup stripes: indices + default ids if required */
+ for (i = 0; i < lsm->lsm_stripe_count; i++) {
+ if (lsm->lsm_oinfo[i].loi_id == 0)
+ lsm->lsm_oinfo[i].loi_id = lsm->lsm_object_id;
- lsm->lsm_oinfo[i].loi_ost_idx =
- (lsm->lsm_stripe_offset + i) % ec->ec_nstripes;
+ lsm->lsm_oinfo[i].loi_ost_idx =
+ (idx + i) % ec->ec_nstripes;
+ }
+ } else {
+ OBD_ALLOC(eco, sizeof(*eco));
+ eco->eco_device = obd;
+ lsm = NULL;
}
+ if (oa->o_id == 0)
+ oa->o_id = ++last_object_id;
+
if (on_target) {
- rc = obd_create (&ec->ec_conn, oa, &lsm, NULL);
+ oa->o_gr = FILTER_GROUP_ECHO;
+ oa->o_valid |= OBD_MD_FLGROUP;
+ rc = obd_create(ec->ec_exp, oa, NULL, 0, &lsm, oti);
if (rc != 0)
goto failed;
/* See what object ID we were given */
- LASSERT ((oa->o_valid & OBD_MD_FLID) != 0);
- eco->eco_id = lsm->lsm_object_id = oa->o_id;
+ eco->eco_id = oa->o_id = lsm->lsm_object_id;
+ oa->o_valid |= OBD_MD_FLID;
+
+ LASSERT(eco->eco_lsm == NULL || eco->eco_lsm == lsm);
+ eco->eco_lsm = lsm;
}
spin_lock (&ec->ec_lock);
oa->o_id, on_target ? " (undoing create)" : "");
if (on_target)
- obd_destroy (&ec->ec_conn, oa, lsm, NULL);
+ obd_destroy(ec->ec_exp, oa, lsm, oti);
rc = -EEXIST;
goto failed;
list_add (&eco->eco_obj_chain, &ec->ec_objects);
spin_unlock (&ec->ec_lock);
CDEBUG (D_INFO,
- "created %p: "LPX64"=%u#%u&%d refs %d del %d\n",
+ "created %p: "LPX64"=%u#%u@%u refs %d del %d\n",
eco, eco->eco_id,
eco->eco_lsm->lsm_stripe_size,
eco->eco_lsm->lsm_stripe_count,
- eco->eco_lsm->lsm_stripe_offset,
+ eco->eco_lsm->lsm_oinfo[0].loi_ost_idx,
eco->eco_refcount, eco->eco_deleted);
return (0);
echo_get_object (struct ec_object **ecop, struct obd_device *obd,
struct obdo *oa)
{
- struct echo_client_obd *ec = &obd->u.echo_client;
+ struct echo_client_obd *ec = &obd->u.echocli;
struct ec_object *eco;
struct ec_object *eco2;
int rc;
- if ((oa->o_valid & OBD_MD_FLID) == 0)
+ if ((oa->o_valid & OBD_MD_FLID) == 0 ||
+ oa->o_id == 0) /* disallow use of object id 0 */
{
CERROR ("No valid oid\n");
return (-EINVAL);
spin_unlock (&ec->ec_lock);
*ecop = eco;
CDEBUG (D_INFO,
- "found %p: "LPX64"=%u#%u&%d refs %d del %d\n",
+ "found %p: "LPX64"=%u#%u@%u refs %d del %d\n",
eco, eco->eco_id,
eco->eco_lsm->lsm_stripe_size,
eco->eco_lsm->lsm_stripe_count,
- eco->eco_lsm->lsm_stripe_offset,
+ eco->eco_lsm->lsm_oinfo[0].loi_ost_idx,
eco->eco_refcount, eco->eco_deleted);
return (0);
}
eco->eco_refcount = 1;
*ecop = eco;
CDEBUG (D_INFO,
- "created %p: "LPX64"=%u#%u&%d refs %d del %d\n",
+ "created %p: "LPX64"=%u#%u@%d refs %d del %d\n",
eco, eco->eco_id,
eco->eco_lsm->lsm_stripe_size,
eco->eco_lsm->lsm_stripe_count,
- eco->eco_lsm->lsm_stripe_offset,
+ eco->eco_lsm->lsm_oinfo[0].loi_ost_idx,
eco->eco_refcount, eco->eco_deleted);
return (0);
}
rc = 0;
LASSERT (eco2->eco_id == eco2->eco_lsm->lsm_object_id);
CDEBUG (D_INFO,
- "found(2) %p: "LPX64"=%u#%u&%d refs %d del %d\n",
+ "found(2) %p: "LPX64"=%u#%u@%d refs %d del %d\n",
eco2, eco2->eco_id,
eco2->eco_lsm->lsm_stripe_size,
eco2->eco_lsm->lsm_stripe_count,
- eco2->eco_lsm->lsm_stripe_offset,
+ eco2->eco_lsm->lsm_oinfo[0].loi_ost_idx,
eco2->eco_refcount, eco2->eco_deleted);
}
echo_put_object (struct ec_object *eco)
{
struct obd_device *obd = eco->eco_device;
- struct echo_client_obd *ec = &obd->u.echo_client;
+ struct echo_client_obd *ec = &obd->u.echocli;
/* Release caller's ref on the object.
* delete => mark for deletion when last ref goes
eco->eco_refcount--;
LASSERT (eco->eco_refcount >= 0);
- CDEBUG(D_INFO, "put %p: "LPX64"=%u#%u&%d refs %d del %d\n",
+ CDEBUG(D_INFO, "put %p: "LPX64"=%u#%u@%d refs %d del %d\n",
eco, eco->eco_id,
eco->eco_lsm->lsm_stripe_size,
eco->eco_lsm->lsm_stripe_count,
- eco->eco_lsm->lsm_stripe_offset,
+ eco->eco_lsm->lsm_oinfo[0].loi_ost_idx,
eco->eco_refcount, eco->eco_deleted);
if (eco->eco_refcount != 0 || !eco->eco_deleted) {
* attempting to enqueue on this object number until we can be
* sure there will be no more lock callbacks.
*/
- obd_cancel_unused(&ec->ec_conn, eco->eco_lsm, 0, NULL);
+ obd_cancel_unused(ec->ec_exp, eco->eco_lsm, 0, NULL);
/* now we can let it go */
spin_lock (&ec->ec_lock);
*offp = offset * stripe_size + woffset % stripe_size;
}
+static void
+echo_client_page_debug_setup(struct lov_stripe_md *lsm,
+ struct page *page, int rw, obd_id id,
+ obd_off offset, obd_off count)
+{
+ char *addr;
+ obd_off stripe_off;
+ obd_id stripe_id;
+ int delta;
+
+ /* no partial pages on the client */
+ LASSERT(count == PAGE_SIZE);
+
+ addr = kmap(page);
+
+ for (delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
+ if (rw == OBD_BRW_WRITE) {
+ stripe_off = offset + delta;
+ stripe_id = id;
+ echo_get_stripe_off_id(lsm, &stripe_off, &stripe_id);
+ } else {
+ stripe_off = 0xdeadbeef00c0ffeeULL;
+ stripe_id = 0xdeadbeef00c0ffeeULL;
+ }
+ block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
+ stripe_off, stripe_id);
+ }
+
+ kunmap(page);
+}
+
static int
-echo_client_kbrw (struct obd_device *obd, int rw,
- struct obdo *oa, struct lov_stripe_md *lsm,
- obd_off offset, obd_size count)
+echo_client_page_debug_check(struct lov_stripe_md *lsm,
+ struct page *page, obd_id id,
+ obd_off offset, obd_off count)
{
- struct echo_client_obd *ec = &obd->u.echo_client;
+ obd_off stripe_off;
+ obd_id stripe_id;
+ char *addr;
+ int delta;
+ int rc;
+ int rc2;
+
+ /* no partial pages on the client */
+ LASSERT(count == PAGE_SIZE);
+
+ addr = kmap(page);
+
+ for (rc = delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
+ stripe_off = offset + delta;
+ stripe_id = id;
+ echo_get_stripe_off_id (lsm, &stripe_off, &stripe_id);
+
+ rc2 = block_debug_check("test_brw",
+ addr + delta, OBD_ECHO_BLOCK_SIZE,
+ stripe_off, stripe_id);
+ if (rc2 != 0) {
+ CERROR ("Error in echo object "LPX64"\n", id);
+ rc = rc2;
+ }
+ }
+
+ kunmap(page);
+ return rc;
+}
+
+static int echo_client_kbrw(struct obd_device *obd, int rw, struct obdo *oa,
+ struct lov_stripe_md *lsm, obd_off offset,
+ obd_size count, struct obd_trans_info *oti)
+{
+ struct echo_client_obd *ec = &obd->u.echocli;
obd_count npages;
struct brw_page *pga;
struct brw_page *pgp;
obd_off off;
int i;
int rc;
- int verify;
+ int verify = 0;
int gfp_mask;
- /* oa_id == 0 => speed test (no verification) else...
- * oa & 1 => use HIGHMEM
- */
- verify = (oa->o_id != 0);
- gfp_mask = ((oa->o_id & 1) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
+ verify = ((oa->o_id) != ECHO_PERSISTENT_OBJID &&
+ (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
+ (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
+
+ gfp_mask = ((oa->o_id & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
goto out;
pgp->count = PAGE_SIZE;
- pgp->off = off;
+ pgp->disk_offset = pgp->page_offset = off;
pgp->flag = 0;
- if (verify) {
- void *addr = kmap(pgp->pg);
- obd_off stripe_off = off;
- obd_id stripe_id = oa->o_id;
-
- if (rw == OBD_BRW_WRITE) {
- echo_get_stripe_off_id(lsm, &stripe_off,
- &stripe_id);
- page_debug_setup(addr, pgp->count,
- stripe_off, stripe_id);
- } else {
- page_debug_setup(addr, pgp->count,
- 0xdeadbeef00c0ffee,
- 0xdeadbeef00c0ffee);
- }
- kunmap(pgp->pg);
- }
+ if (verify)
+ echo_client_page_debug_setup(lsm, pgp->pg, rw,
+ oa->o_id, off, pgp->count);
}
- rc = obd_brw(rw, &ec->ec_conn, lsm, npages, pga, NULL);
+ rc = obd_brw(rw, ec->ec_exp, oa, lsm, npages, pga, oti);
out:
- if (rc != 0)
+ if (rc != 0 || rw != OBD_BRW_READ)
verify = 0;
for (i = 0, pgp = pga; i < npages; i++, pgp++) {
continue;
if (verify) {
- void *addr = kmap(pgp->pg);
- obd_off stripe_off = pgp->off;
- obd_id stripe_id = oa->o_id;
- int vrc;
-
- echo_get_stripe_off_id (lsm, &stripe_off, &stripe_id);
- vrc = page_debug_check("test_brw", addr, pgp->count,
- stripe_off, stripe_id);
+ int vrc;
+ vrc = echo_client_page_debug_check(lsm, pgp->pg, oa->o_id,
+ pgp->page_offset,
+ pgp->count);
if (vrc != 0 && rc == 0)
rc = vrc;
-
- kunmap(pgp->pg);
}
__free_pages(pgp->pg, 0);
}
#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
static int echo_client_ubrw(struct obd_device *obd, int rw,
struct obdo *oa, struct lov_stripe_md *lsm,
- obd_off offset, obd_size count, char *buffer)
+ obd_off offset, obd_size count, char *buffer,
+ struct obd_trans_info *oti)
{
- struct echo_client_obd *ec = &obd->u.echo_client;
+ struct echo_client_obd *ec = &obd->u.echocli;
obd_count npages;
struct brw_page *pga;
struct brw_page *pgp;
for (i = 0, off = offset, pgp = pga;
i < npages;
i++, off += PAGE_SIZE, pgp++) {
- pgp->off = off;
+ pgp->disk_offset = pgp->page_offset = off;
pgp->pg = kiobuf->maplist[i];
pgp->count = PAGE_SIZE;
pgp->flag = 0;
}
- rc = obd_brw(rw, &ec->ec_conn, lsm, npages, pga, NULL);
+ rc = obd_brw(rw, ec->ec_exp, oa, lsm, npages, pga, oti);
// if (rw == OBD_BRW_READ)
// mark_dirty_kiobuf (kiobuf, count);
#else
static int echo_client_ubrw(struct obd_device *obd, int rw,
struct obdo *oa, struct lov_stripe_md *lsm,
- obd_off offset, obd_size count, char *buffer)
+ obd_off offset, obd_size count, char *buffer,
+ struct obd_trans_info *oti)
{
+#warning "echo_client_ubrw() needs to be ported on 2.6 yet"
LBUG();
return 0;
}
#endif
#endif
-static int
-echo_open (struct obd_export *exp, struct obdo *oa)
+struct echo_async_state;
+
+#define EAP_MAGIC 79277927
+struct echo_async_page {
+ int eap_magic;
+ struct page *eap_page;
+ void *eap_cookie;
+ obd_off eap_off;
+ struct echo_async_state *eap_eas;
+ struct list_head eap_item;
+};
+
+#define EAP_FROM_COOKIE(c) \
+ (LASSERT(((struct echo_async_page *)(c))->eap_magic == EAP_MAGIC), \
+ (struct echo_async_page *)(c))
+
+struct echo_async_state {
+ spinlock_t eas_lock;
+ obd_off eas_next_offset;
+ obd_off eas_end_offset;
+ int eas_in_flight;
+ int eas_rc;
+ wait_queue_head_t eas_waitq;
+ struct list_head eas_avail;
+ struct obdo eas_oa;
+ struct lov_stripe_md *eas_lsm;
+};
+
+static int eas_should_wake(struct echo_async_state *eas)
{
- struct obd_device *obd = exp->exp_obd;
- struct echo_client_obd *ec = &obd->u.echo_client;
- struct lustre_handle *ufh = obdo_handle (oa);
- struct ec_open_object *ecoo;
- struct ec_object *eco;
- int rc;
+ unsigned long flags;
+ int rc = 0;
+ spin_lock_irqsave(&eas->eas_lock, flags);
+ if (eas->eas_rc == 0 && !list_empty(&eas->eas_avail))
+ rc = 1;
+ spin_unlock_irqrestore(&eas->eas_lock, flags);
+ return rc;
+};
- rc = echo_get_object (&eco, obd, oa);
- if (rc != 0)
- return rc;
+static int ec_ap_make_ready(void *data, int cmd)
+{
+ /* our pages are issued ready */
+ LBUG();
+ return 0;
+}
+static int ec_ap_refresh_count(void *data, int cmd)
+{
+ /* our pages are issued with a stable count */
+ LBUG();
+ return PAGE_SIZE;
+}
+static void ec_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
+{
+ struct echo_async_page *eap = EAP_FROM_COOKIE(data);
- rc = -ENOMEM;
- OBD_ALLOC (ecoo, sizeof (*ecoo));
- if (ecoo == NULL)
- goto failed_0;
+ memcpy(oa, &eap->eap_eas->eas_oa, sizeof(*oa));
+}
- rc = obd_open(&ec->ec_conn, oa, eco->eco_lsm, NULL, &ecoo->ecoo_och);
- if (rc != 0)
- goto failed_1;
+static void ec_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
+{
+ struct echo_async_page *eap = EAP_FROM_COOKIE(data);
+ struct echo_async_state *eas;
+ unsigned long flags;
+
+ eas = eap->eap_eas;
+
+ if (cmd == OBD_BRW_READ &&
+ eas->eas_oa.o_id != ECHO_PERSISTENT_OBJID &&
+ (eas->eas_oa.o_valid & OBD_MD_FLFLAGS) != 0 &&
+ (eas->eas_oa.o_flags & OBD_FL_DEBUG_CHECK) != 0)
+ echo_client_page_debug_check(eas->eas_lsm, eap->eap_page,
+ eas->eas_oa.o_id, eap->eap_off,
+ PAGE_SIZE);
+
+ spin_lock_irqsave(&eas->eas_lock, flags);
+ if (rc && !eas->eas_rc)
+ eas->eas_rc = rc;
+ eas->eas_in_flight--;
+ list_add(&eap->eap_item, &eas->eas_avail);
+ wake_up(&eas->eas_waitq);
+ spin_unlock_irqrestore(&eas->eas_lock, flags);
+}
- memcpy (&ecoo->ecoo_oa, oa, sizeof (*oa));
- ecoo->ecoo_object = eco;
- /* ecoo takes ref from echo_get_object() above */
+static struct obd_async_page_ops ec_async_page_ops = {
+ .ap_make_ready = ec_ap_make_ready,
+ .ap_refresh_count = ec_ap_refresh_count,
+ .ap_fill_obdo = ec_ap_fill_obdo,
+ .ap_completion = ec_ap_completion,
+};
- spin_lock (&ec->ec_lock);
+static int echo_client_async_page(struct obd_export *exp, int rw,
+ struct obdo *oa, struct lov_stripe_md *lsm,
+ obd_off offset, obd_size count,
+ obd_size batching)
+{
+ obd_count npages, i;
+ struct echo_async_page *eap;
+ struct echo_async_state eas;
+ struct list_head *pos, *n;
+ int rc = 0;
+ unsigned long flags;
+ LIST_HEAD(pages);
+#if 0
+ int verify;
+ int gfp_mask;
+ /* oa_id == 0 => speed test (no verification) else...
+ * oa & 1 => use HIGHMEM
+ */
+ verify = (oa->o_id != 0);
+ gfp_mask = ((oa->o_id & 1) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
+#endif
- list_add (&ecoo->ecoo_exp_chain, &exp->exp_ec_data.eced_open_head);
- ufh->cookie = ecoo->ecoo_cookie = ec->ec_unique++;
- spin_unlock (&ec->ec_lock);
- return 0;
+ LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
- failed_1:
- OBD_FREE (ecoo, sizeof (*ecoo));
- failed_0:
- echo_put_object (eco);
- return (rc);
+ if (count <= 0 ||
+ (count & (PAGE_SIZE - 1)) != 0 ||
+ (lsm != NULL &&
+ lsm->lsm_object_id != oa->o_id))
+ return (-EINVAL);
+
+ /* XXX think again with misaligned I/O */
+ npages = batching >> PAGE_SHIFT;
+
+ memcpy(&eas.eas_oa, oa, sizeof(*oa));
+ eas.eas_next_offset = offset;
+ eas.eas_end_offset = offset + count;
+ spin_lock_init(&eas.eas_lock);
+ init_waitqueue_head(&eas.eas_waitq);
+ eas.eas_in_flight = 0;
+ eas.eas_rc = 0;
+ eas.eas_lsm = lsm;
+ INIT_LIST_HEAD(&eas.eas_avail);
+
+ /* prepare the group of pages that we're going to be keeping
+ * in flight */
+ for (i = 0; i < npages; i++) {
+ struct page *page = alloc_page(GFP_KERNEL);
+ if (page == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ page->private = 0;
+ list_add_tail(&PAGE_LIST(page), &pages);
+
+ OBD_ALLOC(eap, sizeof(*eap));
+ if (eap == NULL)
+ GOTO(out, rc = -ENOMEM);
+
+ eap->eap_magic = EAP_MAGIC;
+ eap->eap_page = page;
+ eap->eap_eas = &eas;
+ page->private = (unsigned long)eap;
+ list_add_tail(&eap->eap_item, &eas.eas_avail);
+ }
+
+ /* first we spin queueing io and being woken by its completion */
+ spin_lock_irqsave(&eas.eas_lock, flags);
+ for(;;) {
+ int rc;
+
+ /* sleep until we have a page to send */
+ spin_unlock_irqrestore(&eas.eas_lock, flags);
+ rc = wait_event_interruptible(eas.eas_waitq,
+ eas_should_wake(&eas));
+ spin_lock_irqsave(&eas.eas_lock, flags);
+ if (rc && !eas.eas_rc)
+ eas.eas_rc = rc;
+ if (eas.eas_rc)
+ break;
+ if (list_empty(&eas.eas_avail))
+ continue;
+ eap = list_entry(eas.eas_avail.next, struct echo_async_page,
+ eap_item);
+ list_del(&eap->eap_item);
+ spin_unlock_irqrestore(&eas.eas_lock, flags);
+
+ /* unbind the eap from its old page offset */
+ if (eap->eap_cookie != NULL) {
+ obd_teardown_async_page(exp, lsm, NULL,
+ eap->eap_cookie);
+ eap->eap_cookie = NULL;
+ }
+
+ eas.eas_next_offset += PAGE_SIZE;
+ eap->eap_off = eas.eas_next_offset;
+
+ rc = obd_prep_async_page(exp, lsm, NULL, eap->eap_page,
+ eap->eap_off, &ec_async_page_ops,
+ eap, &eap->eap_cookie);
+ if (rc) {
+ spin_lock_irqsave(&eas.eas_lock, flags);
+ eas.eas_rc = rc;
+ break;
+ }
+
+ if (oa->o_id != ECHO_PERSISTENT_OBJID &&
+ (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
+ (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0)
+ echo_client_page_debug_setup(lsm, eap->eap_page, rw,
+ oa->o_id,
+ eap->eap_off, PAGE_SIZE);
+
+ /* always asserts urgent, which isn't quite right */
+ rc = obd_queue_async_io(exp, lsm, NULL, eap->eap_cookie,
+ rw, 0, PAGE_SIZE, 0,
+ ASYNC_READY | ASYNC_URGENT |
+ ASYNC_COUNT_STABLE);
+ spin_lock_irqsave(&eas.eas_lock, flags);
+ if (rc && !eas.eas_rc) {
+ eas.eas_rc = rc;
+ break;
+ }
+ eas.eas_in_flight++;
+ if (eas.eas_next_offset == eas.eas_end_offset)
+ break;
+ }
+
+ /* still hold the eas_lock here.. */
+
+ /* now we just spin waiting for all the rpcs to complete */
+ while(eas.eas_in_flight) {
+ spin_unlock_irqrestore(&eas.eas_lock, flags);
+ wait_event_interruptible(eas.eas_waitq,
+ eas.eas_in_flight == 0);
+ spin_lock_irqsave(&eas.eas_lock, flags);
+ }
+ spin_unlock_irqrestore(&eas.eas_lock, flags);
+
+out:
+ list_for_each_safe(pos, n, &pages) {
+ struct page *page = list_entry(pos, struct page,
+ PAGE_LIST_ENTRY);
+
+ list_del(&PAGE_LIST(page));
+ if (page->private != 0) {
+ eap = (struct echo_async_page *)page->private;
+ if (eap->eap_cookie != NULL)
+ obd_teardown_async_page(exp, lsm, NULL,
+ eap->eap_cookie);
+ OBD_FREE(eap, sizeof(*eap));
+ }
+ __free_page(page);
+ }
+
+ RETURN(rc);
}
-static int
-echo_close (struct obd_export *exp, struct obdo *oa)
+static int echo_client_prep_commit(struct obd_export *exp, int rw,
+ struct obdo *oa, struct lov_stripe_md *lsm,
+ obd_off offset, obd_size count,
+ obd_size batch, struct obd_trans_info *oti)
{
- struct obd_device *obd = exp->exp_obd;
- struct echo_client_obd *ec = &obd->u.echo_client;
- struct lustre_handle *ufh = obdo_handle (oa);
- struct ec_open_object *ecoo = NULL;
- int found = 0;
- struct list_head *el;
- int rc;
+ struct obd_ioobj ioo;
+ struct niobuf_local *lnb;
+ struct niobuf_remote *rnb;
+ obd_off off;
+ obd_size npages, tot_pages;
+ int i, ret = 0;
+ ENTRY;
- if ((oa->o_valid & OBD_MD_FLHANDLE) == 0)
- return -EINVAL;
+ if (count <= 0 || (count & (PAGE_SIZE - 1)) != 0 ||
+ (lsm != NULL && lsm->lsm_object_id != oa->o_id))
+ RETURN(-EINVAL);
- spin_lock (&ec->ec_lock);
+ npages = batch >> PAGE_SHIFT;
+ tot_pages = count >> PAGE_SHIFT;
- list_for_each (el, &exp->exp_ec_data.eced_open_head) {
- ecoo = list_entry (el, struct ec_open_object, ecoo_exp_chain);
- found = (ecoo->ecoo_cookie == ufh->cookie);
- if (found) {
- list_del (&ecoo->ecoo_exp_chain);
- break;
+ OBD_ALLOC(lnb, npages * sizeof(struct niobuf_local));
+ OBD_ALLOC(rnb, npages * sizeof(struct niobuf_remote));
+
+ if (lnb == NULL || rnb == NULL)
+ GOTO(out, ret = -ENOMEM);
+
+ obdo_to_ioobj(oa, &ioo);
+
+ off = offset;
+
+ for(; tot_pages; tot_pages -= npages) {
+ if (tot_pages < npages)
+ npages = tot_pages;
+
+ for (i = 0; i < npages; i++, off += PAGE_SIZE) {
+ rnb[i].offset = off;
+ rnb[i].len = PAGE_SIZE;
}
+
+ /* XXX this can't be the best.. */
+ memset(oti, 0, sizeof(*oti));
+ ioo.ioo_bufcnt = npages;
+
+ ret = obd_preprw(rw, exp, oa, 1, &ioo, npages, rnb, lnb, oti);
+ if (ret != 0)
+ GOTO(out, ret);
+
+ for (i = 0; i < npages; i++) {
+ struct page *page = lnb[i].page;
+
+ /* read past eof? */
+ if (page == NULL && lnb[i].rc == 0)
+ continue;
+
+ if (oa->o_id == ECHO_PERSISTENT_OBJID ||
+ (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
+ (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
+ continue;
+
+
+ if (rw == OBD_BRW_WRITE)
+ echo_client_page_debug_setup(lsm, page, rw,
+ oa->o_id,
+ rnb[i].offset,
+ rnb[i].len);
+ else
+ echo_client_page_debug_check(lsm, page,
+ oa->o_id,
+ rnb[i].offset,
+ rnb[i].len);
+ }
+
+ ret = obd_commitrw(rw, exp, oa, 1, &ioo, npages, lnb, oti, ret);
+ if (ret != 0)
+ GOTO(out, ret);
}
- spin_unlock (&ec->ec_lock);
+out:
+ if (lnb)
+ OBD_FREE(lnb, npages * sizeof(struct niobuf_local));
+ if (rnb)
+ OBD_FREE(rnb, npages * sizeof(struct niobuf_remote));
+ RETURN(ret);
+}
- memcpy(&ecoo->ecoo_oa.o_inline, &ecoo->ecoo_och, FD_OSTDATA_SIZE);
- ecoo->ecoo_oa.o_valid |= OBD_MD_FLHANDLE;
+int echo_client_brw_ioctl(int rw, struct obd_export *exp,
+ struct obd_ioctl_data *data)
+{
+ struct obd_device *obd = class_exp2obd(exp);
+ struct echo_client_obd *ec = &obd->u.echocli;
+ struct obd_trans_info dummy_oti;
+ struct ec_object *eco;
+ int rc;
+ ENTRY;
- rc = obd_close (&ec->ec_conn, &ecoo->ecoo_oa,
- ecoo->ecoo_object->eco_lsm, NULL);
+ rc = echo_get_object(&eco, obd, &data->ioc_obdo1);
+ if (rc)
+ RETURN(rc);
- echo_put_object (ecoo->ecoo_object);
- OBD_FREE (ecoo, sizeof (*ecoo));
+ memset(&dummy_oti, 0, sizeof(dummy_oti));
- return (rc);
+ data->ioc_obdo1.o_valid &= ~OBD_MD_FLHANDLE;
+ data->ioc_obdo1.o_valid |= OBD_MD_FLGROUP;
+ data->ioc_obdo1.o_gr = FILTER_GROUP_ECHO;
+
+ switch((long)data->ioc_pbuf1) {
+ case 1:
+ if (data->ioc_pbuf2 == NULL) { // NULL user data pointer
+ rc = echo_client_kbrw(obd, rw, &data->ioc_obdo1,
+ eco->eco_lsm, data->ioc_offset,
+ data->ioc_count, &dummy_oti);
+ } else {
+#ifdef __KERNEL__
+ rc = echo_client_ubrw(obd, rw, &data->ioc_obdo1,
+ eco->eco_lsm, data->ioc_offset,
+ data->ioc_count, data->ioc_pbuf2,
+ &dummy_oti);
+#endif
+ }
+ break;
+ case 2:
+ rc = echo_client_async_page(ec->ec_exp, rw, &data->ioc_obdo1,
+ eco->eco_lsm, data->ioc_offset,
+ data->ioc_count, data->ioc_plen1);
+ break;
+ case 3:
+ rc = echo_client_prep_commit(ec->ec_exp, rw, &data->ioc_obdo1,
+ eco->eco_lsm, data->ioc_offset,
+ data->ioc_count, data->ioc_plen1,
+ &dummy_oti);
+ break;
+ default:
+ rc = -EINVAL;
+ }
+ echo_put_object(eco);
+ RETURN(rc);
}
static int
void *data, int flag)
{
struct ec_object *eco = (struct ec_object *)data;
- struct echo_client_obd *ec = &(eco->eco_device->u.echo_client);
+ struct echo_client_obd *ec = &(eco->eco_device->u.echocli);
struct lustre_handle lockh;
struct list_head *el;
int found = 0;
}
static int
-echo_enqueue (struct obd_export *exp, struct obdo *oa,
- int mode, obd_off offset, obd_size nob)
+echo_client_enqueue(struct obd_export *exp, struct obdo *oa,
+ int mode, obd_off offset, obd_size nob)
{
struct obd_device *obd = exp->exp_obd;
- struct echo_client_obd *ec = &obd->u.echo_client;
+ struct echo_client_obd *ec = &obd->u.echocli;
struct lustre_handle *ulh = obdo_handle (oa);
struct ec_object *eco;
struct ec_lock *ecl;
ecl->ecl_mode = mode;
ecl->ecl_object = eco;
- ecl->ecl_extent.start = offset;
- ecl->ecl_extent.end = (nob == 0) ? ((obd_off) -1) : (offset + nob - 1);
+ ecl->ecl_policy.l_extent.start = offset;
+ ecl->ecl_policy.l_extent.end =
+ (nob == 0) ? ((obd_off) -1) : (offset + nob - 1);
flags = 0;
- rc = obd_enqueue(&ec->ec_conn, eco->eco_lsm, NULL, LDLM_EXTENT,
- &ecl->ecl_extent,sizeof(ecl->ecl_extent), mode,
- &flags, echo_ldlm_callback, eco,
- &ecl->ecl_lock_handle);
+ rc = obd_enqueue(ec->ec_exp, eco->eco_lsm, LDLM_EXTENT,
+ &ecl->ecl_policy, mode, &flags, echo_ldlm_callback,
+ ldlm_completion_ast, NULL, eco, sizeof(struct ost_lvb),
+ lustre_swab_ost_lvb, &ecl->ecl_lock_handle);
if (rc != 0)
goto failed_1;
}
static int
-echo_cancel (struct obd_export *exp, struct obdo *oa)
+echo_client_cancel(struct obd_export *exp, struct obdo *oa)
{
struct obd_device *obd = exp->exp_obd;
- struct echo_client_obd *ec = &obd->u.echo_client;
+ struct echo_client_obd *ec = &obd->u.echocli;
struct lustre_handle *ulh = obdo_handle (oa);
struct ec_lock *ecl = NULL;
int found = 0;
if (!found)
return (-ENOENT);
- rc = obd_cancel(&ec->ec_conn, ecl->ecl_object->eco_lsm, ecl->ecl_mode,
+ rc = obd_cancel(ec->ec_exp, ecl->ecl_object->eco_lsm, ecl->ecl_mode,
&ecl->ecl_lock_handle);
echo_put_object (ecl->ecl_object);
return rc;
}
-static int echo_iocontrol(unsigned int cmd, struct lustre_handle *obdconn,
- int len, void *karg, void *uarg)
+static int
+echo_client_iocontrol(unsigned int cmd, struct obd_export *exp,
+ int len, void *karg, void *uarg)
{
- struct obd_export *exp = class_conn2export (obdconn);
struct obd_device *obd;
struct echo_client_obd *ec;
struct ec_object *eco;
struct obd_ioctl_data *data = karg;
+ struct obd_trans_info dummy_oti;
+ struct oti_req_ack_lock *ack_lock;
+ struct obdo *oa;
int rw = OBD_BRW_READ;
int rc = 0;
+ int i;
ENTRY;
- if (exp == NULL) {
- CERROR("ioctl: No device\n");
- GOTO(out, rc = -EINVAL);
- }
+ unlock_kernel();
+
+ memset(&dummy_oti, 0, sizeof(dummy_oti));
obd = exp->exp_obd;
- ec = &obd->u.echo_client;
+ ec = &obd->u.echocli;
switch (cmd) {
case OBD_IOC_CREATE: /* may create echo object */
GOTO (out, rc = -EPERM);
rc = echo_create_object (obd, 1, &data->ioc_obdo1,
- data->ioc_pbuf1, data->ioc_plen1);
+ data->ioc_pbuf1, data->ioc_plen1,
+ &dummy_oti);
GOTO(out, rc);
case OBD_IOC_DESTROY:
rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
if (rc == 0) {
- rc = obd_destroy(&ec->ec_conn, &data->ioc_obdo1,
- eco->eco_lsm, NULL);
+ oa = &data->ioc_obdo1;
+ oa->o_gr = FILTER_GROUP_ECHO;
+ oa->o_valid |= OBD_MD_FLGROUP;
+ rc = obd_destroy(ec->ec_exp, oa, eco->eco_lsm,
+ &dummy_oti);
if (rc == 0)
eco->eco_deleted = 1;
echo_put_object(eco);
case OBD_IOC_GETATTR:
rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
if (rc == 0) {
- rc = obd_getattr(&ec->ec_conn, &data->ioc_obdo1,
+ rc = obd_getattr(ec->ec_exp, &data->ioc_obdo1,
eco->eco_lsm);
echo_put_object(eco);
}
rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
if (rc == 0) {
- rc = obd_setattr(&ec->ec_conn, &data->ioc_obdo1,
+ rc = obd_setattr(ec->ec_exp, &data->ioc_obdo1,
eco->eco_lsm, NULL);
echo_put_object(eco);
}
GOTO(out, rc);
- case OBD_IOC_OPEN:
- rc = echo_open (exp, &data->ioc_obdo1);
- GOTO(out, rc);
-
- case OBD_IOC_CLOSE:
- rc = echo_close (exp, &data->ioc_obdo1);
- GOTO(out, rc);
-
case OBD_IOC_BRW_WRITE:
if (!capable (CAP_SYS_ADMIN))
GOTO (out, rc = -EPERM);
rw = OBD_BRW_WRITE;
/* fall through */
case OBD_IOC_BRW_READ:
- rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
- if (rc == 0) {
- if (data->ioc_pbuf2 == NULL) // NULL user data pointer
- rc = echo_client_kbrw(obd, rw, &data->ioc_obdo1,
- eco->eco_lsm,
- data->ioc_offset,
- data->ioc_count);
- else
-#ifdef __KERNEL__
- rc = echo_client_ubrw(obd, rw, &data->ioc_obdo1,
- eco->eco_lsm,
- data->ioc_offset,
- data->ioc_count,
- data->ioc_pbuf2);
-#endif
- echo_put_object(eco);
- }
+ rc = echo_client_brw_ioctl(rw, exp, data);
GOTO(out, rc);
case ECHO_IOC_GET_STRIPE:
} else {
rc = echo_create_object(obd, 0, &data->ioc_obdo1,
data->ioc_pbuf1,
- data->ioc_plen1);
+ data->ioc_plen1, &dummy_oti);
}
GOTO (out, rc);
if (!capable (CAP_SYS_ADMIN))
GOTO (out, rc = -EPERM);
- rc = echo_enqueue (exp, &data->ioc_obdo1,
- data->ioc_conn1, /* lock mode */
+ rc = echo_client_enqueue(exp, &data->ioc_obdo1,
+ data->ioc_conn1, /* lock mode */
data->ioc_offset, data->ioc_count);/*extent*/
GOTO (out, rc);
case ECHO_IOC_CANCEL:
- rc = echo_cancel (exp, &data->ioc_obdo1);
+ rc = echo_client_cancel(exp, &data->ioc_obdo1);
GOTO (out, rc);
default:
EXIT;
out:
- class_export_put(exp);
+
+ /* XXX this should be in a helper also called by target_send_reply */
+ for (ack_lock = dummy_oti.oti_ack_locks, i = 0; i < 4;
+ i++, ack_lock++) {
+ if (!ack_lock->mode)
+ break;
+ ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
+ }
+
+ lock_kernel();
+
return rc;
}
-static int echo_setup(struct obd_device *obddev, obd_count len, void *buf)
+static int
+echo_client_setup(struct obd_device *obddev, obd_count len, void *buf)
{
- struct obd_ioctl_data* data = buf;
- struct echo_client_obd *ec = &obddev->u.echo_client;
+ struct lustre_cfg* lcfg = buf;
+ struct echo_client_obd *ec = &obddev->u.echocli;
struct obd_device *tgt;
- struct lov_stripe_md *lsm = NULL;
+ struct lustre_handle conn = {0, };
struct obd_uuid echo_uuid = { "ECHO_UUID" };
int rc;
ENTRY;
- if (data->ioc_inllen1 < 1) {
+ if (lcfg->lcfg_inllen1 < 1) {
CERROR("requires a TARGET OBD name\n");
RETURN(-EINVAL);
}
- tgt = class_name2obd(data->ioc_inlbuf1);
+ tgt = class_name2obd(lcfg->lcfg_inlbuf1);
if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
- CERROR("device not attached or not set up (%d/%s)\n",
- data->ioc_dev, data->ioc_inlbuf1);
+ CERROR("device not attached or not set up (%s)\n",
+ lcfg->lcfg_inlbuf1);
RETURN(-EINVAL);
}
INIT_LIST_HEAD (&ec->ec_objects);
ec->ec_unique = 0;
- rc = obd_connect(&ec->ec_conn, tgt, &echo_uuid);
+ rc = obd_connect(&conn, tgt, &echo_uuid, NULL, FILTER_GROUP_ECHO);
if (rc) {
- CERROR("fail to connect to device %d\n", data->ioc_dev);
+ CERROR("fail to connect to device %s\n", lcfg->lcfg_inlbuf1);
return (rc);
}
-
- ec->ec_lsmsize = obd_alloc_memmd (&ec->ec_conn, &lsm);
- if (ec->ec_lsmsize < 0) {
- CERROR ("Can't get # stripes: %d\n", rc);
- obd_disconnect (&ec->ec_conn, 0);
- rc = ec->ec_lsmsize;
- } else {
- ec->ec_nstripes = lsm->lsm_stripe_count;
- obd_free_memmd (&ec->ec_conn, &lsm);
- }
+ ec->ec_exp = class_conn2export(&conn);
RETURN(rc);
}
-static int echo_cleanup(struct obd_device * obddev, int force, int failover)
+static int echo_client_cleanup(struct obd_device *obddev, int flags)
{
struct list_head *el;
struct ec_object *eco;
- struct echo_client_obd *ec = &obddev->u.echo_client;
+ struct echo_client_obd *ec = &obddev->u.echocli;
int rc;
ENTRY;
}
/* XXX assuming sole access */
- while (!list_empty (&ec->ec_objects)) {
+ while (!list_empty(&ec->ec_objects)) {
el = ec->ec_objects.next;
- eco = list_entry (el, struct ec_object, eco_obj_chain);
+ eco = list_entry(el, struct ec_object, eco_obj_chain);
- LASSERT (eco->eco_refcount == 0);
+ LASSERT(eco->eco_refcount == 0);
eco->eco_refcount = 1;
eco->eco_deleted = 1;
- echo_put_object (eco);
+ echo_put_object(eco);
}
- rc = obd_disconnect (&ec->ec_conn, 0);
+ rc = obd_disconnect(ec->ec_exp, 0);
if (rc != 0)
CERROR("fail to disconnect device: %d\n", rc);
- RETURN (rc);
+ RETURN(rc);
}
-static int echo_connect(struct lustre_handle *conn, struct obd_device *src,
- struct obd_uuid *cluuid)
+static int echo_client_connect(struct lustre_handle *conn,
+ struct obd_device *src,
+ struct obd_uuid *cluuid,
+ struct obd_connect_data *data,
+ unsigned long flags)
{
struct obd_export *exp;
int rc;
rc = class_connect(conn, src, cluuid);
if (rc == 0) {
- exp = class_conn2export (conn);
- INIT_LIST_HEAD(&exp->exp_ec_data.eced_open_head);
+ exp = class_conn2export(conn);
INIT_LIST_HEAD(&exp->exp_ec_data.eced_locks);
class_export_put(exp);
}
RETURN (rc);
}
-static int echo_disconnect(struct lustre_handle *conn, int failover)
+static int echo_client_disconnect(struct obd_export *exp,
+ unsigned long flags)
{
- struct obd_export *exp = class_conn2export (conn);
struct obd_device *obd;
struct echo_client_obd *ec;
- struct ec_open_object *ecoo;
struct ec_lock *ecl;
int rc;
+ ENTRY;
if (exp == NULL)
GOTO(out, rc = -EINVAL);
obd = exp->exp_obd;
- ec = &obd->u.echo_client;
+ ec = &obd->u.echocli;
/* no more contention on export's lock list */
while (!list_empty (&exp->exp_ec_data.eced_locks)) {
struct ec_lock, ecl_exp_chain);
list_del (&ecl->ecl_exp_chain);
- rc = obd_cancel (&ec->ec_conn, ecl->ecl_object->eco_lsm,
+ rc = obd_cancel(ec->ec_exp, ecl->ecl_object->eco_lsm,
ecl->ecl_mode, &ecl->ecl_lock_handle);
- CDEBUG (D_INFO, "Cancel lock on object "LPX64" on disconnect (%d)\n",
- ecl->ecl_object->eco_id, rc);
+ CDEBUG (D_INFO, "Cancel lock on object "LPX64" on disconnect "
+ "(%d)\n", ecl->ecl_object->eco_id, rc);
echo_put_object (ecl->ecl_object);
OBD_FREE (ecl, sizeof (*ecl));
}
- /* no more contention on export's open handle list */
- while (!list_empty (&exp->exp_ec_data.eced_open_head)) {
- ecoo = list_entry (exp->exp_ec_data.eced_open_head.next,
- struct ec_open_object, ecoo_exp_chain);
- list_del (&ecoo->ecoo_exp_chain);
-
- memcpy (&ecoo->ecoo_oa.o_inline, &ecoo->ecoo_och,
- FD_OSTDATA_SIZE);
- ecoo->ecoo_oa.o_valid |= OBD_MD_FLHANDLE;
-
- rc = obd_close (&ec->ec_conn, &ecoo->ecoo_oa,
- ecoo->ecoo_object->eco_lsm, NULL);
-
- CDEBUG (D_INFO, "Closed object "LPX64" on disconnect (%d)\n",
- ecoo->ecoo_oa.o_id, rc);
-
- echo_put_object (ecoo->ecoo_object);
- OBD_FREE (ecoo, sizeof (*ecoo));
- }
-
- rc = class_disconnect (conn, 0);
+ rc = class_disconnect(exp, 0);
GOTO(out, rc);
out:
- class_export_put(exp);
return rc;
}
static struct obd_ops echo_obd_ops = {
- o_owner: THIS_MODULE,
- o_setup: echo_setup,
- o_cleanup: echo_cleanup,
- o_iocontrol: echo_iocontrol,
- o_connect: echo_connect,
- o_disconnect: echo_disconnect
+ .o_owner = THIS_MODULE,
+ .o_setup = echo_client_setup,
+ .o_cleanup = echo_client_cleanup,
+ .o_iocontrol = echo_client_iocontrol,
+ .o_connect = echo_client_connect,
+ .o_disconnect = echo_client_disconnect
};
int echo_client_init(void)
{
struct lprocfs_static_vars lvars;
- lprocfs_init_vars(&lvars);
- return class_register_type(&echo_obd_ops, lvars.module_vars,
+ lprocfs_init_vars(echo, &lvars);
+ return class_register_type(&echo_obd_ops, NULL, lvars.module_vars,
OBD_ECHO_CLIENT_DEVICENAME);
}
-void echo_client_cleanup(void)
+void echo_client_exit(void)
{
class_unregister_type(OBD_ECHO_CLIENT_DEVICENAME);
}