/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
* vim:expandtab:shiftwidth=8:tabstop=8:
*
- * Copyright (c) 2001-2003 Cluster File Systems, Inc.
+ * GPL HEADER START
*
- * This file is part of Lustre, http://www.lustre.org.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
- * Lustre is free software; you can redistribute it and/or
- * modify it under the terms of version 2 of the GNU General Public
- * License as published by the Free Software Foundation.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
*
- * Lustre is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
*
- * You should have received a copy of the GNU General Public License
- * along with Lustre; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
*/
#define DEBUG_SUBSYSTEM S_ECHO
#ifdef __KERNEL__
-#include <linux/version.h>
-#include <linux/module.h>
-#include <linux/fs.h>
-#include <linux/completion.h>
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-#include <linux/iobuf.h>
-#endif
-#include <asm/div64.h>
+#include <libcfs/libcfs.h>
#else
#include <liblustre.h>
#endif
-#include <linux/obd.h>
-#include <linux/obd_support.h>
-#include <linux/obd_class.h>
-#include <linux/obd_echo.h>
-#include <linux/lustre_debug.h>
-#include <linux/lprocfs_status.h>
+#include <obd.h>
+#include <obd_support.h>
+#include <obd_class.h>
+#include <obd_echo.h>
+#include <lustre_debug.h>
+#include <lprocfs_status.h>
static obd_id last_object_id;
struct lov_stripe_md *lsm = eco->eco_lsm;
int i;
- printk (KERN_INFO "Lustre: %s: object %p: "LPX64", refs %d%s: "LPX64
+ CDEBUG(D_INFO, "Lustre: %s: object %p: "LPX64", refs %d%s: "LPX64
"=%u!%u\n", msg, eco, eco->eco_id, eco->eco_refcount,
eco->eco_deleted ? "(deleted) " : "",
lsm->lsm_object_id, lsm->lsm_stripe_size,
lsm->lsm_stripe_count);
for (i = 0; i < lsm->lsm_stripe_count; i++)
- printk (KERN_INFO "Lustre: @%2u:"LPX64"\n",
- lsm->lsm_oinfo[i].loi_ost_idx,
- lsm->lsm_oinfo[i].loi_id);
+ CDEBUG(D_INFO, "Lustre: @%2u:"LPX64"\n",
+ lsm->lsm_oinfo[i].loi_ost_idx,
+ lsm->lsm_oinfo[i].loi_id);
}
#endif
}
static int
-echo_copyout_lsm (struct lov_stripe_md *lsm, void *ulsm, int ulsm_nob)
+echo_copyout_lsm (struct lov_stripe_md *lsm, void *_ulsm, int ulsm_nob)
{
- int nob;
+ struct lov_stripe_md *ulsm = _ulsm;
+ int nob, i;
nob = offsetof (struct lov_stripe_md, lsm_oinfo[lsm->lsm_stripe_count]);
if (nob > ulsm_nob)
return (-EINVAL);
- if (copy_to_user (ulsm, lsm, nob))
+ if (copy_to_user (ulsm, lsm, sizeof(ulsm)))
return (-EFAULT);
- return (0);
+ for (i = 0; i < lsm->lsm_stripe_count; i++) {
+ if (copy_to_user (ulsm->lsm_oinfo[i], lsm->lsm_oinfo[i],
+ sizeof(lsm->lsm_oinfo[0])))
+ return (-EFAULT);
+ }
+ return 0;
}
static int
void *ulsm, int ulsm_nob)
{
struct echo_client_obd *ec = &obd->u.echo_client;
- int nob;
+ int i;
if (ulsm_nob < sizeof (*lsm))
return (-EINVAL);
if (copy_from_user (lsm, ulsm, sizeof (*lsm)))
return (-EFAULT);
- nob = lsm->lsm_stripe_count * sizeof (lsm->lsm_oinfo[0]);
-
- if (ulsm_nob < nob ||
- lsm->lsm_stripe_count > ec->ec_nstripes ||
+ if (lsm->lsm_stripe_count > ec->ec_nstripes ||
lsm->lsm_magic != LOV_MAGIC ||
- (lsm->lsm_stripe_size & (PAGE_SIZE - 1)) != 0 ||
+ (lsm->lsm_stripe_size & (~CFS_PAGE_MASK)) != 0 ||
((__u64)lsm->lsm_stripe_size * lsm->lsm_stripe_count > ~0UL))
return (-EINVAL);
- if (copy_from_user(lsm->lsm_oinfo,
- ((struct lov_stripe_md *)ulsm)->lsm_oinfo, nob))
- return (-EFAULT);
+ for (i = 0; i < lsm->lsm_stripe_count; i++) {
+ if (copy_from_user(lsm->lsm_oinfo[i],
+ ((struct lov_stripe_md *)ulsm)->lsm_oinfo[i],
+ sizeof(lsm->lsm_oinfo[0])))
+ return (-EFAULT);
+ }
return (0);
}
struct echo_client_obd *ec = &obd->u.echo_client;
LASSERT (eco->eco_refcount == 0);
- obd_free_memmd(ec->ec_exp, &eco->eco_lsm);
+ if (!eco->eco_lsm)
+ CERROR("No object %s\n", obd->obd_name);
+ else
+ obd_free_memmd(ec->ec_exp, &eco->eco_lsm);
OBD_FREE (eco, sizeof (*eco));
}
lsm->lsm_stripe_count = ec->ec_nstripes;
if (lsm->lsm_stripe_size == 0)
- lsm->lsm_stripe_size = PAGE_SIZE;
+ lsm->lsm_stripe_size = CFS_PAGE_SIZE;
- idx = ll_insecure_random_int();
+ idx = ll_rand();
/* setup stripes: indices + default ids if required */
for (i = 0; i < lsm->lsm_stripe_count; i++) {
- if (lsm->lsm_oinfo[i].loi_id == 0)
- lsm->lsm_oinfo[i].loi_id = lsm->lsm_object_id;
+ if (lsm->lsm_oinfo[i]->loi_id == 0)
+ lsm->lsm_oinfo[i]->loi_id = lsm->lsm_object_id;
- lsm->lsm_oinfo[i].loi_ost_idx =
+ lsm->lsm_oinfo[i]->loi_ost_idx =
(idx + i) % ec->ec_nstripes;
}
} else {
OBD_ALLOC(eco, sizeof(*eco));
+ if (!eco)
+ return (-ENOMEM);
eco->eco_device = obd;
lsm = NULL;
}
if (on_target) {
oa->o_gr = FILTER_GROUP_ECHO;
oa->o_valid |= OBD_MD_FLGROUP;
+
rc = obd_create(ec->ec_exp, oa, &lsm, oti);
if (rc != 0)
goto failed;
oa->o_id, on_target ? " (undoing create)" : "");
if (on_target)
- obd_destroy(ec->ec_exp, oa, lsm, oti);
+ obd_destroy(ec->ec_exp, oa, lsm, oti, NULL);
rc = -EEXIST;
goto failed;
eco, eco->eco_id,
eco->eco_lsm->lsm_stripe_size,
eco->eco_lsm->lsm_stripe_count,
- eco->eco_lsm->lsm_oinfo[0].loi_ost_idx,
+ eco->eco_lsm->lsm_oinfo[0]->loi_ost_idx,
eco->eco_refcount, eco->eco_deleted);
return (0);
failed:
echo_free_object (eco);
+ if (rc)
+ CERROR("%s: err %d on create\n", obd->obd_name, rc);
return (rc);
}
spin_lock (&ec->ec_lock);
eco = echo_find_object_locked (obd, oa->o_id);
if (eco != NULL) {
- if (eco->eco_deleted) /* being deleted */
- return (-EAGAIN); /* (see comment in cleanup) */
+ if (eco->eco_deleted) { /* being deleted */
+ spin_unlock(&ec->ec_lock);/* (see comment in cleanup) */
+ return (-EAGAIN);
+ }
eco->eco_refcount++;
spin_unlock (&ec->ec_lock);
eco, eco->eco_id,
eco->eco_lsm->lsm_stripe_size,
eco->eco_lsm->lsm_stripe_count,
- eco->eco_lsm->lsm_oinfo[0].loi_ost_idx,
+ eco->eco_lsm->lsm_oinfo[0]->loi_ost_idx,
eco->eco_refcount, eco->eco_deleted);
return (0);
}
eco, eco->eco_id,
eco->eco_lsm->lsm_stripe_size,
eco->eco_lsm->lsm_stripe_count,
- eco->eco_lsm->lsm_oinfo[0].loi_ost_idx,
+ eco->eco_lsm->lsm_oinfo[0]->loi_ost_idx,
eco->eco_refcount, eco->eco_deleted);
return (0);
}
eco2, eco2->eco_id,
eco2->eco_lsm->lsm_stripe_size,
eco2->eco_lsm->lsm_stripe_count,
- eco2->eco_lsm->lsm_oinfo[0].loi_ost_idx,
+ eco2->eco_lsm->lsm_oinfo[0]->loi_ost_idx,
eco2->eco_refcount, eco2->eco_deleted);
}
eco, eco->eco_id,
eco->eco_lsm->lsm_stripe_size,
eco->eco_lsm->lsm_stripe_count,
- eco->eco_lsm->lsm_oinfo[0].loi_ost_idx,
+ eco->eco_lsm->lsm_oinfo[0]->loi_ost_idx,
eco->eco_refcount, eco->eco_deleted);
if (eco->eco_refcount != 0 || !eco->eco_deleted) {
stripe_index = woffset / stripe_size;
- *idp = lsm->lsm_oinfo[stripe_index].loi_id;
+ *idp = lsm->lsm_oinfo[stripe_index]->loi_id;
*offp = offset * stripe_size + woffset % stripe_size;
}
-static void
-echo_client_page_debug_setup(struct lov_stripe_md *lsm,
- struct page *page, int rw, obd_id id,
+static void
+echo_client_page_debug_setup(struct lov_stripe_md *lsm,
+ cfs_page_t *page, int rw, obd_id id,
obd_off offset, obd_off count)
{
char *addr;
int delta;
/* no partial pages on the client */
- LASSERT(count == PAGE_SIZE);
+ LASSERT(count == CFS_PAGE_SIZE);
- addr = kmap(page);
+ addr = cfs_kmap(page);
- for (delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
+ for (delta = 0; delta < CFS_PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
if (rw == OBD_BRW_WRITE) {
stripe_off = offset + delta;
stripe_id = id;
stripe_off = 0xdeadbeef00c0ffeeULL;
stripe_id = 0xdeadbeef00c0ffeeULL;
}
- block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
+ block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,
stripe_off, stripe_id);
}
- kunmap(page);
+ cfs_kunmap(page);
}
-static int
-echo_client_page_debug_check(struct lov_stripe_md *lsm,
- struct page *page, obd_id id,
- obd_off offset, obd_off count)
+static int echo_client_page_debug_check(struct lov_stripe_md *lsm,
+ cfs_page_t *page, obd_id id,
+ obd_off offset, obd_off count)
{
obd_off stripe_off;
obd_id stripe_id;
int rc2;
/* no partial pages on the client */
- LASSERT(count == PAGE_SIZE);
+ LASSERT(count == CFS_PAGE_SIZE);
- addr = kmap(page);
+ addr = cfs_kmap(page);
- for (rc = delta = 0; delta < PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
+ for (rc = delta = 0; delta < CFS_PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {
stripe_off = offset + delta;
stripe_id = id;
echo_get_stripe_off_id (lsm, &stripe_off, &stripe_id);
- rc2 = block_debug_check("test_brw",
- addr + delta, OBD_ECHO_BLOCK_SIZE,
+ rc2 = block_debug_check("test_brw",
+ addr + delta, OBD_ECHO_BLOCK_SIZE,
stripe_off, stripe_id);
- if (rc2 != 0)
+ if (rc2 != 0) {
+ CERROR ("Error in echo object "LPX64"\n", id);
rc = rc2;
+ }
}
- kunmap(page);
+ cfs_kunmap(page);
return rc;
}
obd_size count, struct obd_trans_info *oti)
{
struct echo_client_obd *ec = &obd->u.echo_client;
+ struct obd_info oinfo = { { { 0 } } };
obd_count npages;
struct brw_page *pga;
struct brw_page *pgp;
obd_off off;
int i;
int rc;
- int verify = 0;
+ int verify;
int gfp_mask;
- /* oa_id == ECHO_PERSISTENT_OBJID => speed test (no verification).
- * oa & 1 => use HIGHMEM */
+ verify = ((oa->o_id) != ECHO_PERSISTENT_OBJID &&
+ (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
+ (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
- verify = (oa->o_id) != ECHO_PERSISTENT_OBJID;
- gfp_mask = ((oa->o_id & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
+ gfp_mask = ((oa->o_id & 2) == 0) ? CFS_ALLOC_STD : CFS_ALLOC_HIGHUSER;
LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
+ LASSERT(lsm != NULL);
+ LASSERT(lsm->lsm_object_id == oa->o_id);
if (count <= 0 ||
- (count & (PAGE_SIZE - 1)) != 0 ||
- (lsm != NULL &&
- lsm->lsm_object_id != oa->o_id))
+ (count & (~CFS_PAGE_MASK)) != 0)
return (-EINVAL);
/* XXX think again with misaligned I/O */
- npages = count >> PAGE_SHIFT;
+ npages = count >> CFS_PAGE_SHIFT;
OBD_ALLOC(pga, npages * sizeof(*pga));
if (pga == NULL)
for (i = 0, pgp = pga, off = offset;
i < npages;
- i++, pgp++, off += PAGE_SIZE) {
+ i++, pgp++, off += CFS_PAGE_SIZE) {
LASSERT (pgp->pg == NULL); /* for cleanup */
rc = -ENOMEM;
- pgp->pg = alloc_pages (gfp_mask, 0);
+ OBD_PAGE_ALLOC(pgp->pg, gfp_mask);
if (pgp->pg == NULL)
goto out;
- pgp->count = PAGE_SIZE;
+ pgp->count = CFS_PAGE_SIZE;
pgp->off = off;
pgp->flag = 0;
if (verify)
- echo_client_page_debug_setup(lsm, pgp->pg, rw,
+ echo_client_page_debug_setup(lsm, pgp->pg, rw,
oa->o_id, off, pgp->count);
}
- rc = obd_brw(rw, ec->ec_exp, oa, lsm, npages, pga, oti);
+ oinfo.oi_oa = oa;
+ oinfo.oi_md = lsm;
+ rc = obd_brw(rw, ec->ec_exp, &oinfo, npages, pga, oti);
out:
if (rc != 0 || rw != OBD_BRW_READ)
if (vrc != 0 && rc == 0)
rc = vrc;
}
- __free_pages(pgp->pg, 0);
+ OBD_PAGE_FREE(pgp->pg);
}
OBD_FREE(pga, npages * sizeof(*pga));
return (rc);
}
-#ifdef __KERNEL__
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-static int echo_client_ubrw(struct obd_device *obd, int rw,
- struct obdo *oa, struct lov_stripe_md *lsm,
- obd_off offset, obd_size count, char *buffer,
- struct obd_trans_info *oti)
-{
- struct echo_client_obd *ec = &obd->u.echo_client;
- obd_count npages;
- struct brw_page *pga;
- struct brw_page *pgp;
- obd_off off;
- struct kiobuf *kiobuf;
- int i;
- int rc;
-
- LASSERT (rw == OBD_BRW_WRITE ||
- rw == OBD_BRW_READ);
-
- /* NB: for now, only whole pages, page aligned */
-
- if (count <= 0 ||
- ((long)buffer & (PAGE_SIZE - 1)) != 0 ||
- (count & (PAGE_SIZE - 1)) != 0 ||
- (lsm != NULL && lsm->lsm_object_id != oa->o_id))
- return (-EINVAL);
-
- /* XXX think again with misaligned I/O */
- npages = count >> PAGE_SHIFT;
-
- OBD_ALLOC(pga, npages * sizeof(*pga));
- if (pga == NULL)
- return (-ENOMEM);
-
- rc = alloc_kiovec (1, &kiobuf);
- if (rc != 0)
- goto out_1;
-
- rc = map_user_kiobuf ((rw == OBD_BRW_READ) ? READ : WRITE,
- kiobuf, (unsigned long)buffer, count);
- if (rc != 0)
- goto out_2;
-
- LASSERT (kiobuf->offset == 0);
- LASSERT (kiobuf->nr_pages == npages);
-
- for (i = 0, off = offset, pgp = pga;
- i < npages;
- i++, off += PAGE_SIZE, pgp++) {
- pgp->off = off;
- pgp->pg = kiobuf->maplist[i];
- pgp->count = PAGE_SIZE;
- pgp->flag = 0;
- }
-
- rc = obd_brw(rw, ec->ec_exp, oa, lsm, npages, pga, oti);
-
- // if (rw == OBD_BRW_READ)
- // mark_dirty_kiobuf (kiobuf, count);
-
- unmap_kiobuf (kiobuf);
- out_2:
- free_kiovec (1, &kiobuf);
- out_1:
- OBD_FREE(pga, npages * sizeof(*pga));
- return (rc);
-}
-#else
-static int echo_client_ubrw(struct obd_device *obd, int rw,
- struct obdo *oa, struct lov_stripe_md *lsm,
- obd_off offset, obd_size count, char *buffer,
- struct obd_trans_info *oti)
-{
-#warning "echo_client_ubrw() needs to be ported on 2.6 yet"
- LBUG();
- return 0;
-}
-#endif
-#endif
-
struct echo_async_state;
#define EAP_MAGIC 79277927
struct echo_async_page {
int eap_magic;
- struct page *eap_page;
+ cfs_page_t *eap_page;
void *eap_cookie;
obd_off eap_off;
struct echo_async_state *eap_eas;
struct list_head eap_item;
};
+#define EAP_FROM_COOKIE(c) \
+ (LASSERT(((struct echo_async_page *)(c))->eap_magic == EAP_MAGIC), \
+ (struct echo_async_page *)(c))
+
struct echo_async_state {
spinlock_t eas_lock;
obd_off eas_next_offset;
obd_off eas_end_offset;
int eas_in_flight;
int eas_rc;
- wait_queue_head_t eas_waitq;
+ cfs_waitq_t eas_waitq;
struct list_head eas_avail;
struct obdo eas_oa;
struct lov_stripe_md *eas_lsm;
static int eas_should_wake(struct echo_async_state *eas)
{
- unsigned long flags;
int rc = 0;
- spin_lock_irqsave(&eas->eas_lock, flags);
+
+ spin_lock(&eas->eas_lock);
if (eas->eas_rc == 0 && !list_empty(&eas->eas_avail))
rc = 1;
- spin_unlock_irqrestore(&eas->eas_lock, flags);
+ spin_unlock(&eas->eas_lock);
return rc;
};
-struct echo_async_page *eap_from_cookie(void *cookie)
-{
- struct echo_async_page *eap = cookie;
- if (eap->eap_magic != EAP_MAGIC)
- return ERR_PTR(-EINVAL);
- return eap;
-};
-
static int ec_ap_make_ready(void *data, int cmd)
{
/* our pages are issued ready */
{
/* our pages are issued with a stable count */
LBUG();
- return PAGE_SIZE;
+ return CFS_PAGE_SIZE;
}
static void ec_ap_fill_obdo(void *data, int cmd, struct obdo *oa)
{
- struct echo_async_page *eap;
- eap = eap_from_cookie(data);
- if (IS_ERR(eap))
- return;
+ struct echo_async_page *eap = EAP_FROM_COOKIE(data);
memcpy(oa, &eap->eap_eas->eas_oa, sizeof(*oa));
}
-static void ec_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
+static int ec_ap_completion(void *data, int cmd, struct obdo *oa, int rc)
{
- struct echo_async_page *eap = eap_from_cookie(data);
+ struct echo_async_page *eap = EAP_FROM_COOKIE(data);
struct echo_async_state *eas;
- unsigned long flags;
- if (IS_ERR(eap))
- return;
eas = eap->eap_eas;
if (cmd == OBD_BRW_READ &&
- eas->eas_oa.o_id != ECHO_PERSISTENT_OBJID)
- echo_client_page_debug_check(eas->eas_lsm, eap->eap_page,
+ eas->eas_oa.o_id != ECHO_PERSISTENT_OBJID &&
+ (eas->eas_oa.o_valid & OBD_MD_FLFLAGS) != 0 &&
+ (eas->eas_oa.o_flags & OBD_FL_DEBUG_CHECK) != 0)
+ echo_client_page_debug_check(eas->eas_lsm, eap->eap_page,
eas->eas_oa.o_id, eap->eap_off,
- PAGE_SIZE);
+ CFS_PAGE_SIZE);
- spin_lock_irqsave(&eas->eas_lock, flags);
+ spin_lock(&eas->eas_lock);
if (rc && !eas->eas_rc)
eas->eas_rc = rc;
eas->eas_in_flight--;
list_add(&eap->eap_item, &eas->eas_avail);
- wake_up(&eas->eas_waitq);
- spin_unlock_irqrestore(&eas->eas_lock, flags);
+ cfs_waitq_signal(&eas->eas_waitq);
+ spin_unlock(&eas->eas_lock);
+ return 0;
}
static struct obd_async_page_ops ec_async_page_ops = {
obd_count npages, i;
struct echo_async_page *eap;
struct echo_async_state eas;
- struct list_head *pos, *n;
int rc = 0;
- unsigned long flags;
- LIST_HEAD(pages);
+ struct echo_async_page **aps = NULL;
+
+ ENTRY;
#if 0
int verify;
int gfp_mask;
- /* oa_id == 0 => speed test (no verification) else...
- * oa & 1 => use HIGHMEM
- */
- verify = (oa->o_id != 0);
- gfp_mask = ((oa->o_id & 1) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
+
+ verify = ((oa->o_id) != ECHO_PERSISTENT_OBJID &&
+ (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
+ (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
+
+ gfp_mask = ((oa->o_id & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER;
#endif
LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);
if (count <= 0 ||
- (count & (PAGE_SIZE - 1)) != 0 ||
+ (count & (~CFS_PAGE_MASK)) != 0 ||
(lsm != NULL &&
lsm->lsm_object_id != oa->o_id))
return (-EINVAL);
/* XXX think again with misaligned I/O */
- npages = batching >> PAGE_SHIFT;
+ npages = batching >> CFS_PAGE_SHIFT;
memcpy(&eas.eas_oa, oa, sizeof(*oa));
eas.eas_next_offset = offset;
eas.eas_end_offset = offset + count;
spin_lock_init(&eas.eas_lock);
- init_waitqueue_head(&eas.eas_waitq);
+ cfs_waitq_init(&eas.eas_waitq);
eas.eas_in_flight = 0;
eas.eas_rc = 0;
eas.eas_lsm = lsm;
- INIT_LIST_HEAD(&eas.eas_avail);
+ CFS_INIT_LIST_HEAD(&eas.eas_avail);
+
+ OBD_ALLOC(aps, npages * sizeof aps[0]);
+ if (aps == NULL)
+ return (-ENOMEM);
/* prepare the group of pages that we're going to be keeping
* in flight */
for (i = 0; i < npages; i++) {
- struct page *page = alloc_page(GFP_KERNEL);
+ cfs_page_t *page;
+ OBD_PAGE_ALLOC(page, CFS_ALLOC_STD);
if (page == NULL)
GOTO(out, rc = -ENOMEM);
- page->private = 0;
- list_add_tail(&PAGE_LIST(page), &pages);
-
OBD_ALLOC(eap, sizeof(*eap));
- if (eap == NULL)
+ if (eap == NULL) {
+ OBD_PAGE_FREE(page);
GOTO(out, rc = -ENOMEM);
+ }
eap->eap_magic = EAP_MAGIC;
eap->eap_page = page;
eap->eap_eas = &eas;
- page->private = (unsigned long)eap;
list_add_tail(&eap->eap_item, &eas.eas_avail);
+ aps[i] = eap;
}
/* first we spin queueing io and being woken by its completion */
- spin_lock_irqsave(&eas.eas_lock, flags);
+ spin_lock(&eas.eas_lock);
for(;;) {
int rc;
/* sleep until we have a page to send */
- spin_unlock_irqrestore(&eas.eas_lock, flags);
- rc = wait_event_interruptible(eas.eas_waitq,
+ spin_unlock(&eas.eas_lock);
+ rc = wait_event_interruptible(eas.eas_waitq,
eas_should_wake(&eas));
- spin_lock_irqsave(&eas.eas_lock, flags);
+ spin_lock(&eas.eas_lock);
if (rc && !eas.eas_rc)
eas.eas_rc = rc;
if (eas.eas_rc)
eap = list_entry(eas.eas_avail.next, struct echo_async_page,
eap_item);
list_del(&eap->eap_item);
- spin_unlock_irqrestore(&eas.eas_lock, flags);
+ spin_unlock(&eas.eas_lock);
/* unbind the eap from its old page offset */
if (eap->eap_cookie != NULL) {
- obd_teardown_async_page(exp, lsm, NULL,
+ obd_teardown_async_page(exp, lsm, NULL,
eap->eap_cookie);
eap->eap_cookie = NULL;
}
- eas.eas_next_offset += PAGE_SIZE;
+ eas.eas_next_offset += CFS_PAGE_SIZE;
eap->eap_off = eas.eas_next_offset;
rc = obd_prep_async_page(exp, lsm, NULL, eap->eap_page,
eap->eap_off, &ec_async_page_ops,
- eap, &eap->eap_cookie);
+ eap, &eap->eap_cookie, 1, NULL);
if (rc) {
- spin_lock_irqsave(&eas.eas_lock, flags);
+ spin_lock(&eas.eas_lock);
eas.eas_rc = rc;
break;
}
- if (oa->o_id != ECHO_PERSISTENT_OBJID)
- echo_client_page_debug_setup(lsm, eap->eap_page, rw,
- oa->o_id,
- eap->eap_off, PAGE_SIZE);
+ if (oa->o_id != ECHO_PERSISTENT_OBJID &&
+ (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
+ (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0)
+ echo_client_page_debug_setup(lsm, eap->eap_page, rw,
+ oa->o_id,
+ eap->eap_off, CFS_PAGE_SIZE);
/* always asserts urgent, which isn't quite right */
rc = obd_queue_async_io(exp, lsm, NULL, eap->eap_cookie,
- rw, 0, PAGE_SIZE, 0,
+ rw, 0, CFS_PAGE_SIZE, 0,
ASYNC_READY | ASYNC_URGENT |
ASYNC_COUNT_STABLE);
- spin_lock_irqsave(&eas.eas_lock, flags);
+ spin_lock(&eas.eas_lock);
if (rc && !eas.eas_rc) {
eas.eas_rc = rc;
break;
eas.eas_in_flight++;
if (eas.eas_next_offset == eas.eas_end_offset)
break;
- }
+ }
/* still hold the eas_lock here.. */
/* now we just spin waiting for all the rpcs to complete */
while(eas.eas_in_flight) {
- spin_unlock_irqrestore(&eas.eas_lock, flags);
- wait_event_interruptible(eas.eas_waitq,
+ spin_unlock(&eas.eas_lock);
+ wait_event_interruptible(eas.eas_waitq,
eas.eas_in_flight == 0);
- spin_lock_irqsave(&eas.eas_lock, flags);
+ spin_lock(&eas.eas_lock);
}
- spin_unlock_irqrestore(&eas.eas_lock, flags);
+ spin_unlock(&eas.eas_lock);
out:
- list_for_each_safe(pos, n, &pages) {
- struct page *page = list_entry(pos, struct page,
- PAGE_LIST_ENTRY);
+ if (aps != NULL) {
+ for (i = 0; i < npages; ++ i) {
+ cfs_page_t *page;
- list_del(&PAGE_LIST(page));
- if (page->private != 0) {
- eap = (struct echo_async_page *)page->private;
+ eap = aps[i];
+ page = eap->eap_page;
if (eap->eap_cookie != NULL)
- obd_teardown_async_page(exp, lsm, NULL,
+ obd_teardown_async_page(exp, lsm, NULL,
eap->eap_cookie);
OBD_FREE(eap, sizeof(*eap));
+ OBD_PAGE_FREE(page);
}
- __free_page(page);
+ OBD_FREE(aps, npages * sizeof aps[0]);
}
RETURN(rc);
static int echo_client_prep_commit(struct obd_export *exp, int rw,
struct obdo *oa, struct lov_stripe_md *lsm,
- obd_off offset, obd_size count,
+ obd_off offset, obd_size count,
obd_size batch, struct obd_trans_info *oti)
{
struct obd_ioobj ioo;
int i, ret = 0;
ENTRY;
- if (count <= 0 || (count & (PAGE_SIZE - 1)) != 0 ||
+ if (count <= 0 || (count & (~CFS_PAGE_MASK)) != 0 ||
(lsm != NULL && lsm->lsm_object_id != oa->o_id))
RETURN(-EINVAL);
- npages = batch >> PAGE_SHIFT;
- tot_pages = count >> PAGE_SHIFT;
+ npages = batch >> CFS_PAGE_SHIFT;
+ tot_pages = count >> CFS_PAGE_SHIFT;
OBD_ALLOC(lnb, npages * sizeof(struct niobuf_local));
OBD_ALLOC(rnb, npages * sizeof(struct niobuf_remote));
if (tot_pages < npages)
npages = tot_pages;
- for (i = 0; i < npages; i++, off += PAGE_SIZE) {
+ for (i = 0; i < npages; i++, off += CFS_PAGE_SIZE) {
rnb[i].offset = off;
- rnb[i].len = PAGE_SIZE;
+ rnb[i].len = CFS_PAGE_SIZE;
}
- /* XXX this can't be the best.. */
- memset(oti, 0, sizeof(*oti));
ioo.ioo_bufcnt = npages;
+ oti->oti_transno = 0;
- ret = obd_preprw(rw, exp, oa, 1, &ioo, npages, rnb, lnb, oti);
+ ret = obd_preprw(rw, exp, oa, 1, &ioo, npages, rnb, lnb, oti,
+ NULL);
if (ret != 0)
GOTO(out, ret);
for (i = 0; i < npages; i++) {
- struct page *page = lnb[i].page;
+ cfs_page_t *page = lnb[i].page;
/* read past eof? */
if (page == NULL && lnb[i].rc == 0)
continue;
- if (oa->o_id == ECHO_PERSISTENT_OBJID)
+ if (oa->o_id == ECHO_PERSISTENT_OBJID ||
+ (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||
+ (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)
continue;
if (rw == OBD_BRW_WRITE)
{
struct obd_device *obd = class_exp2obd(exp);
struct echo_client_obd *ec = &obd->u.echo_client;
- struct obd_trans_info dummy_oti;
+ struct obd_trans_info dummy_oti = { .oti_thread_id = -1 };
struct ec_object *eco;
int rc;
ENTRY;
if (rc)
RETURN(rc);
- memset(&dummy_oti, 0, sizeof(dummy_oti));
-
data->ioc_obdo1.o_valid &= ~OBD_MD_FLHANDLE;
data->ioc_obdo1.o_valid |= OBD_MD_FLGROUP;
data->ioc_obdo1.o_gr = FILTER_GROUP_ECHO;
switch((long)data->ioc_pbuf1) {
case 1:
- if (data->ioc_pbuf2 == NULL) { // NULL user data pointer
- rc = echo_client_kbrw(obd, rw, &data->ioc_obdo1,
+ rc = echo_client_kbrw(obd, rw, &data->ioc_obdo1,
eco->eco_lsm, data->ioc_offset,
data->ioc_count, &dummy_oti);
- } else {
-#ifdef __KERNEL__
- rc = echo_client_ubrw(obd, rw, &data->ioc_obdo1,
- eco->eco_lsm, data->ioc_offset,
- data->ioc_count, data->ioc_pbuf2,
- &dummy_oti);
-#endif
- }
break;
case 2:
rc = echo_client_async_page(ec->ec_exp, rw, &data->ioc_obdo1,
{
struct obd_device *obd = exp->exp_obd;
struct echo_client_obd *ec = &obd->u.echo_client;
- struct lustre_handle *ulh = obdo_handle (oa);
+ struct lustre_handle *ulh = &oa->o_handle;
+ struct ldlm_enqueue_info einfo = { 0 };
+ struct obd_info oinfo = { { { 0 } } };
struct ec_object *eco;
struct ec_lock *ecl;
- int flags;
int rc;
if (!(mode == LCK_PR || mode == LCK_PW))
return -EINVAL;
- if ((offset & (PAGE_SIZE - 1)) != 0 ||
- (nob & (PAGE_SIZE - 1)) != 0)
+ if ((offset & (~CFS_PAGE_MASK)) != 0 ||
+ (nob & (~CFS_PAGE_MASK)) != 0)
return -EINVAL;
rc = echo_get_object (&eco, obd, oa);
ecl->ecl_policy.l_extent.end =
(nob == 0) ? ((obd_off) -1) : (offset + nob - 1);
- flags = 0;
- rc = obd_enqueue(ec->ec_exp, eco->eco_lsm, LDLM_EXTENT,
- &ecl->ecl_policy, mode, &flags, echo_ldlm_callback,
- ldlm_completion_ast, NULL, eco, sizeof(struct ost_lvb),
- lustre_swab_ost_lvb, &ecl->ecl_lock_handle);
+ einfo.ei_type = LDLM_EXTENT;
+ einfo.ei_mode = mode;
+ einfo.ei_cb_bl = echo_ldlm_callback;
+ einfo.ei_cb_cp = ldlm_completion_ast;
+ einfo.ei_cb_gl = NULL;
+ einfo.ei_cbdata = eco;
+
+ oinfo.oi_policy = ecl->ecl_policy;
+ oinfo.oi_lockh = &ecl->ecl_lock_handle;
+ oinfo.oi_md = eco->eco_lsm;
+ rc = obd_enqueue(ec->ec_exp, &oinfo, &einfo, NULL);
if (rc != 0)
goto failed_1;
{
struct obd_device *obd = exp->exp_obd;
struct echo_client_obd *ec = &obd->u.echo_client;
- struct lustre_handle *ulh = obdo_handle (oa);
+ struct lustre_handle *ulh = &oa->o_handle;
struct ec_lock *ecl = NULL;
int found = 0;
struct list_head *el;
int i;
ENTRY;
+ unlock_kernel();
+
memset(&dummy_oti, 0, sizeof(dummy_oti));
obd = exp->exp_obd;
oa = &data->ioc_obdo1;
oa->o_gr = FILTER_GROUP_ECHO;
oa->o_valid |= OBD_MD_FLGROUP;
- rc = obd_destroy(ec->ec_exp, oa, eco->eco_lsm,
- &dummy_oti);
+ rc = obd_destroy(ec->ec_exp, oa, eco->eco_lsm,
+ &dummy_oti, NULL);
if (rc == 0)
eco->eco_deleted = 1;
echo_put_object(eco);
case OBD_IOC_GETATTR:
rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
if (rc == 0) {
- rc = obd_getattr(ec->ec_exp, &data->ioc_obdo1,
- eco->eco_lsm);
+ struct obd_info oinfo = { { { 0 } } };
+ oinfo.oi_md = eco->eco_lsm;
+ oinfo.oi_oa = &data->ioc_obdo1;
+ rc = obd_getattr(ec->ec_exp, &oinfo);
echo_put_object(eco);
}
GOTO(out, rc);
rc = echo_get_object (&eco, obd, &data->ioc_obdo1);
if (rc == 0) {
- rc = obd_setattr(ec->ec_exp, &data->ioc_obdo1,
- eco->eco_lsm, NULL);
+ struct obd_info oinfo = { { { 0 } } };
+ oinfo.oi_oa = &data->ioc_obdo1;
+ oinfo.oi_md = eco->eco_lsm;
+
+ rc = obd_setattr(ec->ec_exp, &oinfo, NULL);
echo_put_object(eco);
}
GOTO(out, rc);
out:
/* XXX this should be in a helper also called by target_send_reply */
- for (ack_lock = dummy_oti.oti_ack_locks, i = 0; i < 4;
+ for (ack_lock = dummy_oti.oti_ack_locks, i = 0; i < 4;
i++, ack_lock++) {
if (!ack_lock->mode)
break;
ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
}
+ lock_kernel();
+
return rc;
}
-static int
-echo_client_setup(struct obd_device *obddev, obd_count len, void *buf)
+static int echo_client_setup(struct obd_device *obddev, struct lustre_cfg *lcfg)
{
- struct lustre_cfg* lcfg = buf;
struct echo_client_obd *ec = &obddev->u.echo_client;
struct obd_device *tgt;
struct lustre_handle conn = {0, };
struct obd_uuid echo_uuid = { "ECHO_UUID" };
+ struct obd_connect_data *ocd = NULL;
int rc;
ENTRY;
- if (lcfg->lcfg_inllen1 < 1) {
+ if (lcfg->lcfg_bufcount < 2 || LUSTRE_CFG_BUFLEN(lcfg, 1) < 1) {
CERROR("requires a TARGET OBD name\n");
RETURN(-EINVAL);
}
- tgt = class_name2obd(lcfg->lcfg_inlbuf1);
+ tgt = class_name2obd(lustre_cfg_string(lcfg, 1));
if (!tgt || !tgt->obd_attached || !tgt->obd_set_up) {
CERROR("device not attached or not set up (%s)\n",
- lcfg->lcfg_inlbuf1);
+ lustre_cfg_string(lcfg, 1));
RETURN(-EINVAL);
}
spin_lock_init (&ec->ec_lock);
- INIT_LIST_HEAD (&ec->ec_objects);
+ CFS_INIT_LIST_HEAD (&ec->ec_objects);
ec->ec_unique = 0;
- rc = obd_connect(&conn, tgt, &echo_uuid, 0);
- if (rc) {
- CERROR("fail to connect to device %s\n", lcfg->lcfg_inlbuf1);
+ OBD_ALLOC(ocd, sizeof(*ocd));
+ if (ocd == NULL) {
+ CERROR("Can't alloc ocd connecting to %s\n",
+ lustre_cfg_string(lcfg, 1));
+ return -ENOMEM;
+ }
+
+ ocd->ocd_connect_flags = OBD_CONNECT_VERSION | OBD_CONNECT_REQPORTAL;
+ ocd->ocd_version = LUSTRE_VERSION_CODE;
+ ocd->ocd_group = FILTER_GROUP_ECHO;
+
+ rc = obd_connect(NULL, &conn, tgt, &echo_uuid, ocd, NULL);
+
+ OBD_FREE(ocd, sizeof(*ocd));
+
+ if (rc != 0) {
+ CERROR("fail to connect to device %s\n",
+ lustre_cfg_string(lcfg, 1));
return (rc);
}
ec->ec_exp = class_conn2export(&conn);
RETURN(rc);
}
-static int echo_client_cleanup(struct obd_device *obddev, int flags)
+static int echo_client_cleanup(struct obd_device *obddev)
{
struct list_head *el;
struct ec_object *eco;
echo_put_object(eco);
}
- rc = obd_disconnect(ec->ec_exp, 0);
+ rc = obd_disconnect(ec->ec_exp);
if (rc != 0)
CERROR("fail to disconnect device: %d\n", rc);
RETURN(rc);
}
-static int echo_client_connect(struct lustre_handle *conn,
+static int echo_client_connect(const struct lu_env *env,
+ struct lustre_handle *conn,
struct obd_device *src, struct obd_uuid *cluuid,
- unsigned long connect_flags)
+ struct obd_connect_data *data, void *localdata)
{
struct obd_export *exp;
int rc;
+ ENTRY;
rc = class_connect(conn, src, cluuid);
if (rc == 0) {
exp = class_conn2export(conn);
- INIT_LIST_HEAD(&exp->exp_ec_data.eced_locks);
+ CFS_INIT_LIST_HEAD(&exp->exp_ec_data.eced_locks);
class_export_put(exp);
}
RETURN (rc);
}
-static int echo_client_disconnect(struct obd_export *exp, int flags)
+static int echo_client_disconnect(struct obd_export *exp)
{
struct obd_device *obd;
struct echo_client_obd *ec;
OBD_FREE (ecl, sizeof (*ecl));
}
- rc = class_disconnect(exp, 0);
+ rc = class_disconnect(exp);
GOTO(out, rc);
out:
return rc;
int echo_client_init(void)
{
- struct lprocfs_static_vars lvars;
+ struct lprocfs_static_vars lvars = { 0 };
- lprocfs_init_vars(echo, &lvars);
+ lprocfs_echo_init_vars(&lvars);
return class_register_type(&echo_obd_ops, NULL, lvars.module_vars,
- OBD_ECHO_CLIENT_DEVICENAME);
+ LUSTRE_ECHO_CLIENT_NAME, NULL);
}
void echo_client_exit(void)
{
- class_unregister_type(OBD_ECHO_CLIENT_DEVICENAME);
+ class_unregister_type(LUSTRE_ECHO_CLIENT_NAME);
}