#define EXPORT_SYMTAB
#define DEBUG_SUBSYSTEM S_OSC
+#ifdef __KERNEL__
#include <linux/version.h>
#include <linux/module.h>
#include <linux/mm.h>
#include <linux/lustre_dlm.h>
#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
#include <linux/workqueue.h>
+#include <linux/smp_lock.h>
+#else
+#include <linux/locks.h>
#endif
+#else
+#include <liblustre.h>
+#endif
+
#include <linux/kp30.h>
#include <linux/lustre_mds.h> /* for mds_objid */
#include <linux/obd_ost.h>
#include <portals/lib-types.h> /* for PTL_MD_MAX_IOV */
#include <linux/lprocfs_status.h>
+/* It is important that ood_fh remain the first item in this structure: that
+ * way, we don't have to re-pack the obdo's inline data before we send it to
+ * the server, we can just send the whole struct unaltered. */
+#define OSC_OBDO_DATA_MAGIC 0xD15EA5ED
+struct osc_obdo_data {
+ struct lustre_handle ood_fh;
+ struct ptlrpc_request *ood_request;
+ __u32 ood_magic;
+};
+#include <linux/obd_lov.h> /* just for the startup assertion; is that wrong? */
+
+static int send_sync(struct obd_import *imp, struct ll_fid *rootfid,
+ int level, int msg_flags)
+{
+ struct ptlrpc_request *req;
+ struct mds_body *body;
+ int rc, size = sizeof(*body);
+ ENTRY;
+
+ req = ptlrpc_prep_req(imp, OST_SYNCFS, 1, &size, NULL);
+ if (!req)
+ GOTO(out, rc = -ENOMEM);
+
+ body = lustre_msg_buf(req->rq_reqmsg, 0);
+ req->rq_level = level;
+ req->rq_replen = lustre_msg_size(1, &size);
+
+ req->rq_reqmsg->flags |= msg_flags;
+ rc = ptlrpc_queue_wait(req);
+
+ if (!rc) {
+ CDEBUG(D_NET, "last_committed="LPU64
+ ", last_xid="LPU64"\n",
+ req->rq_repmsg->last_committed,
+ req->rq_repmsg->last_xid);
+ }
+
+ EXIT;
+ out:
+ ptlrpc_req_finished(req);
+ return rc;
+}
+
+static int signal_completed_replay(struct obd_import *imp)
+{
+ struct ll_fid fid;
+
+ return send_sync(imp, &fid, LUSTRE_CONN_RECOVD, MSG_LAST_REPLAY);
+}
+
static int osc_attach(struct obd_device *dev, obd_count len, void *data)
{
struct lprocfs_static_vars lvars;
RETURN(lsm_size);
}
-inline void oti_from_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
+inline void oti_from_request(struct obd_trans_info *oti,
+ struct ptlrpc_request *req)
{
if (oti && req->rq_repmsg)
oti->oti_transno = NTOH__u64(req->rq_repmsg->transno);
if (!request)
RETURN(-ENOMEM);
-#warning FIXME: request->rq_flags |= PTL_RPC_FL_REPLAY;
+ request->rq_flags |= PTL_RPC_FL_REPLAY;
body = lustre_msg_buf(request->rq_reqmsg, 0);
#warning FIXME: pack only valid fields instead of memcpy, endianness
memcpy(&body->oa, oa, sizeof(*oa));
if (rc)
GOTO(out, rc);
- body = lustre_msg_buf(request->rq_repmsg, 0);
- CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
- if (oa)
+ if (oa) {
+ struct osc_obdo_data ood;
+ body = lustre_msg_buf(request->rq_repmsg, 0);
memcpy(oa, &body->oa, sizeof(*oa));
+ /* If the open succeeded, we better have a handle */
+ /* BlueArc OSTs don't send back (o_valid | FLHANDLE). sigh.
+ * Temporary workaround until fixed. -phil 24 Feb 03 */
+ //LASSERT(oa->o_valid & OBD_MD_FLHANDLE);
+ oa->o_valid |= OBD_MD_FLHANDLE;
+
+ memcpy(&ood.ood_fh, obdo_handle(oa), sizeof(ood.ood_fh));
+ ood.ood_request = ptlrpc_request_addref(request);
+ ood.ood_magic = OSC_OBDO_DATA_MAGIC;
+
+ /* Save this data in the request; it will be passed back to us
+ * in future obdos. This memcpy is guaranteed to be safe,
+ * because we check at compile-time that sizeof(ood) is smaller
+ * than oa->o_inline. */
+ memcpy(&oa->o_inline, &ood, sizeof(ood));
+ }
+
EXIT;
out:
ptlrpc_req_finished(request);
static int osc_close(struct lustre_handle *conn, struct obdo *oa,
struct lov_stripe_md *md, struct obd_trans_info *oti)
{
+ struct obd_import *import = class_conn2cliimp(conn);
struct ptlrpc_request *request;
struct ost_body *body;
+ struct osc_obdo_data *ood;
+ unsigned long flags;
int rc, size = sizeof(*body);
ENTRY;
- request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_CLOSE, 1, &size,
- NULL);
+ LASSERT(oa != NULL);
+ ood = (struct osc_obdo_data *)&oa->o_inline;
+ LASSERT(ood->ood_magic == OSC_OBDO_DATA_MAGIC);
+
+ request = ptlrpc_prep_req(import, OST_CLOSE, 1, &size, NULL);
if (!request)
RETURN(-ENOMEM);
request->rq_replen = lustre_msg_size(1, &size);
rc = ptlrpc_queue_wait(request);
- if (rc)
+ if (rc) {
+ /* FIXME: Does this mean that the file is still open locally?
+ * If not, and I somehow suspect not, we need to cleanup
+ * below */
GOTO(out, rc);
+ }
+
+ spin_lock_irqsave(&import->imp_lock, flags);
+ ood->ood_request->rq_flags &= ~PTL_RPC_FL_REPLAY;
+ /* see comments in llite/file.c:ll_mdc_close() */
+ if (ood->ood_request->rq_transno) {
+ LBUG(); /* this can't happen yet */
+ if (!request->rq_transno) {
+ request->rq_transno = ood->ood_request->rq_transno;
+ ptlrpc_retain_replayable_request(request, import);
+ }
+ spin_unlock_irqrestore(&import->imp_lock, flags);
+ } else {
+ spin_unlock_irqrestore(&import->imp_lock, flags);
+ ptlrpc_req_finished(ood->ood_request);
+ }
body = lustre_msg_buf(request->rq_repmsg, 0);
- CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
- if (oa)
- memcpy(oa, &body->oa, sizeof(*oa));
+ memcpy(oa, &body->oa, sizeof(*oa));
EXIT;
out:
struct list_head *tmp;
ENTRY;
- /* This feels wrong to me. */
list_for_each(tmp, &desc->bd_page_list) {
struct ptlrpc_bulk_page *bulk;
bulk = list_entry(tmp, struct ptlrpc_bulk_page, bp_link);
EXIT;
}
+/*
+ * This is called when there was a bulk error return. However, we don't know
+ * whether the bulk completed or not. We cancel the portals bulk descriptors,
+ * so that if the OST decides to send them later we don't double free. Then
+ * remove this descriptor from the set so that the set callback doesn't wait
+ * forever for the last CB_PHASE_FINISH to be called, and finally dump all of
+ * the bulk descriptor references.
+ */
+static void osc_ptl_ev_abort(struct ptlrpc_bulk_desc *desc)
+{
+ ENTRY;
+
+ LASSERT(desc->bd_brw_set != NULL);
+
+ ptlrpc_abort_bulk(desc);
+ obd_brw_set_del(desc);
+ unmap_and_decref_bulk_desc(desc);
+
+ EXIT;
+}
+
static int osc_brw_read(struct lustre_handle *conn, struct lov_stripe_md *lsm,
obd_count page_count, struct brw_page *pga,
struct obd_brw_set *set)
struct ptlrpc_bulk_desc *desc = NULL;
struct ost_body *body;
int rc, size[3] = {sizeof(*body)}, mapped = 0;
- unsigned long flags;
struct obd_ioobj *iooptr;
void *nioptr;
__u32 xid;
ENTRY;
+restart_bulk:
size[1] = sizeof(struct obd_ioobj);
size[2] = page_count * sizeof(struct niobuf_remote);
RETURN(-ENOMEM);
body = lustre_msg_buf(request->rq_reqmsg, 0);
+ body->oa.o_valid = HTON__u32(OBD_MD_FLCKSUM * CHECKSUM_BULK);
desc = ptlrpc_prep_bulk(connection);
if (!desc)
ost_pack_ioo(&iooptr, lsm, page_count);
/* end almost identical to brw_write case */
- spin_lock_irqsave(&imp->imp_lock, flags);
- xid = ++imp->imp_last_xid; /* single xid for all pages */
- spin_unlock_irqrestore(&imp->imp_lock, flags);
+ xid = ptlrpc_next_xid(); /* single xid for all pages */
obd_kmap_get(page_count, 0);
for (mapped = 0; mapped < page_count; mapped++) {
struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
- if (bulk == NULL)
- GOTO(out_unmap, rc = -ENOMEM);
+ if (bulk == NULL) {
+ unmap_and_decref_bulk_desc(desc);
+ GOTO(out_req, rc = -ENOMEM);
+ }
bulk->bp_xid = xid; /* single xid for all pages */
* Register the bulk first, because the reply could arrive out of order,
* and we want to be ready for the bulk data.
*
- * One reference is released when brw_finish is complete, the other when
- * the caller removes us from the "set" list.
+ * One reference is released when osc_ptl_ev_hdlr() is called by
+ * portals, the other when the caller removes us from the "set" list.
*
* On error, we never do the brw_finish, so we handle all decrefs.
*/
OBD_FAIL_OSC_BRW_READ_BULK);
} else {
rc = ptlrpc_register_bulk_put(desc);
- if (rc)
- GOTO(out_unmap, rc);
+ if (rc) {
+ unmap_and_decref_bulk_desc(desc);
+ GOTO(out_req, rc);
+ }
obd_brw_set_add(set, desc);
}
+ request->rq_flags |= PTL_RPC_FL_NO_RESEND;
request->rq_replen = lustre_msg_size(1, size);
rc = ptlrpc_queue_wait(request);
- /*
- * XXX: If there is an error during the processing of the callback,
- * such as a timeout in a sleep that it performs, brw_finish
- * will never get called, and we'll leak the desc, fail to kunmap
- * things, cats will live with dogs. One solution would be to
- * export brw_finish as osc_brw_finish, so that the timeout case
- * and its kin could call it for proper cleanup. An alternative
- * would be for an error return from the callback to cause us to
- * clean up, but that doesn't help the truly async cases (like
- * LOV), which will immediately return from their PHASE_START
- * callback, before any such cleanup-requiring error condition can
- * be detected.
- */
+ /* XXX bug 937 here */
+ if (rc == -ETIMEDOUT && (request->rq_flags & PTL_RPC_FL_RESEND)) {
+ DEBUG_REQ(D_HA, request, "BULK TIMEOUT");
+ ptlrpc_req_finished(request);
+ goto restart_bulk;
+ }
+
+ if (rc) {
+ osc_ptl_ev_abort(desc);
+ GOTO(out_req, rc);
+ }
+
+#if CHECKSUM_BULK
+ body = lustre_msg_buf(request->rq_repmsg, 0);
+ if (body->oa.o_valid & NTOH__u32(OBD_MD_FLCKSUM)) {
+ static int cksum_counter;
+ __u64 server_cksum = NTOH__u64(body->oa.o_rdev);
+ __u64 cksum = 0;
+
+ for (mapped = 0; mapped < page_count; mapped++) {
+ char *ptr = kmap(pga[mapped].pg);
+ int off = pga[mapped].off & (PAGE_SIZE - 1);
+ int len = pga[mapped].count;
+
+ LASSERT(off + len <= PAGE_SIZE);
+ ost_checksum(&cksum, ptr + off, len);
+ kunmap(pga[mapped].pg);
+ }
+
+ cksum_counter++;
+ if (server_cksum != cksum) {
+ CERROR("Bad checksum: server "LPX64", client "LPX64
+ ", server NID "LPX64"\n", server_cksum, cksum,
+ imp->imp_connection->c_peer.peer_nid);
+ cksum_counter = 0;
+ } else if ((cksum_counter & (-cksum_counter)) == cksum_counter)
+ CERROR("Checksum %u from "LPX64" OK: "LPX64"\n",
+ cksum_counter,
+ imp->imp_connection->c_peer.peer_nid, cksum);
+ } else {
+ static int cksum_missed;
+ cksum_missed++;
+ if ((cksum_missed & (-cksum_missed)) == cksum_missed)
+ CERROR("Request checksum %u from "LPX64", no reply\n",
+ cksum_missed,
+ imp->imp_connection->c_peer.peer_nid);
+ }
+#endif
+
+ EXIT;
out_req:
ptlrpc_req_finished(request);
- RETURN(rc);
-
- /* Clean up on error. */
-out_unmap:
- while (mapped-- > 0)
- kunmap(pga[mapped].pg);
- obd_kmap_put(page_count);
- ptlrpc_bulk_decref(desc);
- goto out_req;
+ return rc;
}
static int osc_brw_write(struct lustre_handle *conn, struct lov_stripe_md *lsm,
struct ptlrpc_bulk_desc *desc = NULL;
struct ost_body *body;
int rc, size[3] = {sizeof(*body)}, mapped = 0;
- unsigned long flags;
struct obd_ioobj *iooptr;
void *nioptr;
__u32 xid;
+#if CHECKSUM_BULK
+ __u64 cksum = 0;
+#endif
ENTRY;
+restart_bulk:
size[1] = sizeof(struct obd_ioobj);
size[2] = page_count * sizeof(struct niobuf_remote);
ost_pack_ioo(&iooptr, lsm, page_count);
/* end almost identical to brw_read case */
- spin_lock_irqsave(&imp->imp_lock, flags);
- xid = ++imp->imp_last_xid; /* single xid for all pages */
- spin_unlock_irqrestore(&imp->imp_lock, flags);
+ xid = ptlrpc_next_xid(); /* single xid for all pages */
obd_kmap_get(page_count, 0);
for (mapped = 0; mapped < page_count; mapped++) {
struct ptlrpc_bulk_page *bulk = ptlrpc_prep_bulk_page(desc);
- if (bulk == NULL)
- GOTO(out_unmap, rc = -ENOMEM);
+ if (bulk == NULL) {
+ unmap_and_decref_bulk_desc(desc);
+ GOTO(out_req, rc = -ENOMEM);
+ }
bulk->bp_xid = xid; /* single xid for all pages */
bulk->bp_buf = kmap(pga[mapped].pg);
bulk->bp_page = pga[mapped].pg;
- bulk->bp_buflen = PAGE_SIZE;
+ bulk->bp_buflen = pga[mapped].count;
ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
pga[mapped].flag, bulk->bp_xid);
+ ost_checksum(&cksum, bulk->bp_buf, bulk->bp_buflen);
}
+#if CHECKSUM_BULK
+ body->oa.o_rdev = HTON__u64(cksum);
+ body->oa.o_valid |= HTON__u32(OBD_MD_FLCKSUM);
+#endif
/*
* Register the bulk first, because the reply could arrive out of
* order, and we want to be ready for the bulk data.
*/
if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_WRITE_BULK)) {
CERROR("obd_fail_loc=%x, skipping register_bulk\n",
- OBD_FAIL_OSC_BRW_WRITE_BULK);
+ OBD_FAIL_OSC_BRW_WRITE_BULK);
} else {
rc = ptlrpc_register_bulk_get(desc);
- if (rc)
- GOTO(out_unmap, rc);
+ if (rc) {
+ unmap_and_decref_bulk_desc(desc);
+ GOTO(out_req, rc);
+ }
obd_brw_set_add(set, desc);
}
+ request->rq_flags |= PTL_RPC_FL_NO_RESEND;
request->rq_replen = lustre_msg_size(1, size);
rc = ptlrpc_queue_wait(request);
- /*
- * XXX: If there is an error during the processing of the callback,
- * such as a timeout in a sleep that it performs, brw_finish
- * will never get called, and we'll leak the desc, fail to kunmap
- * things, cats will live with dogs. One solution would be to
- * export brw_finish as osc_brw_finish, so that the timeout case
- * and its kin could call it for proper cleanup. An alternative
- * would be for an error return from the callback to cause us to
- * clean up, but that doesn't help the truly async cases (like
- * LOV), which will immediately return from their PHASE_START
- * callback, before any such cleanup-requiring error condition can
- * be detected.
- */
+ /* XXX bug 937 here */
+ if (rc == -ETIMEDOUT && (request->rq_flags & PTL_RPC_FL_RESEND)) {
+ DEBUG_REQ(D_HA, request, "BULK TIMEOUT");
+ ptlrpc_req_finished(request);
+ goto restart_bulk;
+ }
+
+ if (rc) {
+ osc_ptl_ev_abort(desc);
+ GOTO(out_req, rc);
+ }
+
+ EXIT;
out_req:
ptlrpc_req_finished(request);
+ return rc;
+}
+
+#ifndef min_t
+#define min_t(a,b,c) ( b<c ) ? b : c
+#endif
+
+#warning "FIXME: make values dynamic based on get_info at setup (bug 665)"
+#define OSC_BRW_MAX_SIZE 65536
+#define OSC_BRW_MAX_IOV min_t(int, PTL_MD_MAX_IOV, OSC_BRW_MAX_SIZE/PAGE_SIZE)
+
+static int osc_brw(int cmd, struct lustre_handle *conn,
+ struct lov_stripe_md *md, obd_count page_count,
+ struct brw_page *pga, struct obd_brw_set *set,
+ struct obd_trans_info *oti)
+{
+ ENTRY;
+
+ while (page_count) {
+ obd_count pages_per_brw;
+ int rc;
+
+ if (page_count > OSC_BRW_MAX_IOV)
+ pages_per_brw = OSC_BRW_MAX_IOV;
+ else
+ pages_per_brw = page_count;
+
+ if (cmd & OBD_BRW_WRITE)
+ rc = osc_brw_write(conn, md, pages_per_brw, pga,
+ set, oti);
+ else
+ rc = osc_brw_read(conn, md, pages_per_brw, pga, set);
+
+ if (rc != 0)
+ RETURN(rc);
+
+ page_count -= pages_per_brw;
+ pga += pages_per_brw;
+ }
+ RETURN(0);
+}
+
+#ifdef __KERNEL__
+/* Note: caller will lock/unlock, and set uptodate on the pages */
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+static int sanosc_brw_read(struct lustre_handle *conn,
+ struct lov_stripe_md *md,
+ obd_count page_count,
+ struct brw_page *pga,
+ struct obd_brw_set *set)
+{
+ struct ptlrpc_request *request = NULL;
+ struct ost_body *body;
+ struct niobuf_remote *remote, *nio_rep;
+ int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
+ struct obd_ioobj *iooptr;
+ void *nioptr;
+ ENTRY;
+
+ size[1] = sizeof(struct obd_ioobj);
+ size[2] = page_count * sizeof(*remote);
+
+ request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_READ, 3,
+ size, NULL);
+ if (!request)
+ RETURN(-ENOMEM);
+
+ body = lustre_msg_buf(request->rq_reqmsg, 0);
+ iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
+ nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
+ ost_pack_ioo(&iooptr, md, page_count);
+
+ obd_kmap_get(page_count, 0);
+
+ for (mapped = 0; mapped < page_count; mapped++) {
+ LASSERT(PageLocked(pga[mapped].pg));
+
+ kmap(pga[mapped].pg);
+ ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+ pga[mapped].flag, 0);
+ }
+
+ size[1] = page_count * sizeof(*remote);
+ request->rq_replen = lustre_msg_size(2, size);
+
+ rc = ptlrpc_queue_wait(request);
+ if (rc)
+ GOTO(out_unmap, rc);
+
+ nioptr = lustre_msg_buf(request->rq_repmsg, 1);
+ if (!nioptr)
+ GOTO(out_unmap, rc = -EINVAL);
+
+ if (request->rq_repmsg->buflens[1] != size[1]) {
+ CERROR("buffer length wrong (%d vs. %d)\n",
+ request->rq_repmsg->buflens[1], size[1]);
+ GOTO(out_unmap, rc = -EINVAL);
+ }
+
+ for (j = 0; j < page_count; j++) {
+ ost_unpack_niobuf(&nioptr, &remote);
+ }
+
+ nioptr = lustre_msg_buf(request->rq_repmsg, 1);
+ nio_rep = (struct niobuf_remote*)nioptr;
+
+ /* actual read */
+ for (j = 0; j < page_count; j++) {
+ struct page *page = pga[j].pg;
+ struct buffer_head *bh;
+ kdev_t dev;
+
+ /* got san device associated */
+ LASSERT(class_conn2obd(conn));
+ dev = class_conn2obd(conn)->u.cli.cl_sandev;
+
+ /* hole */
+ if (!nio_rep[j].offset) {
+ CDEBUG(D_PAGE, "hole at ino %lu; index %ld\n",
+ page->mapping->host->i_ino,
+ page->index);
+ memset(page_address(page), 0, PAGE_SIZE);
+ continue;
+ }
+
+ if (!page->buffers) {
+ create_empty_buffers(page, dev, PAGE_SIZE);
+ bh = page->buffers;
+
+ clear_bit(BH_New, &bh->b_state);
+ set_bit(BH_Mapped, &bh->b_state);
+ bh->b_blocknr = (unsigned long)nio_rep[j].offset;
+
+ clear_bit(BH_Uptodate, &bh->b_state);
+
+ ll_rw_block(READ, 1, &bh);
+ } else {
+ bh = page->buffers;
+
+ /* if buffer already existed, it must be the
+ * one we mapped before, check it */
+ LASSERT(!test_bit(BH_New, &bh->b_state));
+ LASSERT(test_bit(BH_Mapped, &bh->b_state));
+ LASSERT(bh->b_blocknr ==
+ (unsigned long)nio_rep[j].offset);
+
+ /* wait it's io completion */
+ if (test_bit(BH_Lock, &bh->b_state))
+ wait_on_buffer(bh);
+
+ if (!test_bit(BH_Uptodate, &bh->b_state))
+ ll_rw_block(READ, 1, &bh);
+ }
+
+
+ /* must do syncronous write here */
+ wait_on_buffer(bh);
+ if (!buffer_uptodate(bh)) {
+ /* I/O error */
+ rc = -EIO;
+ goto out_unmap;
+ }
+ }
+
+out_req:
+ ptlrpc_req_finished(request);
RETURN(rc);
+out_unmap:
/* Clean up on error. */
+ while (mapped-- > 0)
+ kunmap(pga[mapped].pg);
+
+ obd_kmap_put(page_count);
+
+ goto out_req;
+}
+
+static int sanosc_brw_write(struct lustre_handle *conn,
+ struct lov_stripe_md *md,
+ obd_count page_count,
+ struct brw_page *pga,
+ struct obd_brw_set *set)
+{
+ struct ptlrpc_request *request = NULL;
+ struct ost_body *body;
+ struct niobuf_remote *remote, *nio_rep;
+ int rc, j, size[3] = {sizeof(*body)}, mapped = 0;
+ struct obd_ioobj *iooptr;
+ void *nioptr;
+ ENTRY;
+
+ size[1] = sizeof(struct obd_ioobj);
+ size[2] = page_count * sizeof(*remote);
+
+ request = ptlrpc_prep_req(class_conn2cliimp(conn), OST_SAN_WRITE,
+ 3, size, NULL);
+ if (!request)
+ RETURN(-ENOMEM);
+
+ body = lustre_msg_buf(request->rq_reqmsg, 0);
+ iooptr = lustre_msg_buf(request->rq_reqmsg, 1);
+ nioptr = lustre_msg_buf(request->rq_reqmsg, 2);
+ ost_pack_ioo(&iooptr, md, page_count);
+
+ /* map pages, and pack request */
+ obd_kmap_get(page_count, 0);
+ for (mapped = 0; mapped < page_count; mapped++) {
+ LASSERT(PageLocked(pga[mapped].pg));
+
+ kmap(pga[mapped].pg);
+ ost_pack_niobuf(&nioptr, pga[mapped].off, pga[mapped].count,
+ pga[mapped].flag, 0);
+ }
+
+ size[1] = page_count * sizeof(*remote);
+ request->rq_replen = lustre_msg_size(2, size);
+
+ rc = ptlrpc_queue_wait(request);
+ if (rc)
+ GOTO(out_unmap, rc);
+
+ nioptr = lustre_msg_buf(request->rq_repmsg, 1);
+ if (!nioptr)
+ GOTO(out_unmap, rc = -EINVAL);
+
+ if (request->rq_repmsg->buflens[1] != size[1]) {
+ CERROR("buffer length wrong (%d vs. %d)\n",
+ request->rq_repmsg->buflens[1], size[1]);
+ GOTO(out_unmap, rc = -EINVAL);
+ }
+
+ for (j = 0; j < page_count; j++) {
+ ost_unpack_niobuf(&nioptr, &remote);
+ }
+
+ nioptr = lustre_msg_buf(request->rq_repmsg, 1);
+ nio_rep = (struct niobuf_remote*)nioptr;
+
+ /* actual write */
+ for (j = 0; j < page_count; j++) {
+ struct page *page = pga[j].pg;
+ struct buffer_head *bh;
+ kdev_t dev;
+
+ /* got san device associated */
+ LASSERT(class_conn2obd(conn));
+ dev = class_conn2obd(conn)->u.cli.cl_sandev;
+
+ if (!page->buffers) {
+ create_empty_buffers(page, dev, PAGE_SIZE);
+ } else {
+ /* checking */
+ LASSERT(!test_bit(BH_New, &page->buffers->b_state));
+ LASSERT(test_bit(BH_Mapped, &page->buffers->b_state));
+ LASSERT(page->buffers->b_blocknr ==
+ (unsigned long)nio_rep[j].offset);
+ }
+ bh = page->buffers;
+
+ LASSERT(bh);
+
+ /* if buffer locked, wait it's io completion */
+ if (test_bit(BH_Lock, &bh->b_state))
+ wait_on_buffer(bh);
+
+ clear_bit(BH_New, &bh->b_state);
+ set_bit(BH_Mapped, &bh->b_state);
+
+ /* override the block nr */
+ bh->b_blocknr = (unsigned long)nio_rep[j].offset;
+
+ /* we are about to write it, so set it
+ * uptodate/dirty
+ * page lock should garentee no race condition here */
+ set_bit(BH_Uptodate, &bh->b_state);
+ set_bit(BH_Dirty, &bh->b_state);
+
+ ll_rw_block(WRITE, 1, &bh);
+
+ /* must do syncronous write here */
+ wait_on_buffer(bh);
+ if (!buffer_uptodate(bh) || test_bit(BH_Dirty, &bh->b_state)) {
+ /* I/O error */
+ rc = -EIO;
+ goto out_unmap;
+ }
+ }
+
+out_req:
+ ptlrpc_req_finished(request);
+ RETURN(rc);
+
out_unmap:
+ /* Clean up on error. */
while (mapped-- > 0)
kunmap(pga[mapped].pg);
+
obd_kmap_put(page_count);
- ptlrpc_bulk_decref(desc);
+
goto out_req;
}
+#else
+static int sanosc_brw_read(struct lustre_handle *conn,
+ struct lov_stripe_md *md,
+ obd_count page_count,
+ struct brw_page *pga,
+ struct obd_brw_set *set)
+{
+ LBUG();
+ return 0;
+}
-static int osc_brw(int cmd, struct lustre_handle *conn,
- struct lov_stripe_md *md, obd_count page_count,
- struct brw_page *pga, struct obd_brw_set *set,
- struct obd_trans_info *oti)
+static int sanosc_brw_write(struct lustre_handle *conn,
+ struct lov_stripe_md *md,
+ obd_count page_count,
+ struct brw_page *pga,
+ struct obd_brw_set *set)
+{
+ LBUG();
+ return 0;
+}
+#endif
+
+static int sanosc_brw(int cmd, struct lustre_handle *conn,
+ struct lov_stripe_md *md, obd_count page_count,
+ struct brw_page *pga, struct obd_brw_set *set,
+ struct obd_trans_info *oti)
{
ENTRY;
obd_count pages_per_brw;
int rc;
- if (page_count > PTL_MD_MAX_IOV)
- pages_per_brw = PTL_MD_MAX_IOV;
+ if (page_count > OSC_BRW_MAX_IOV)
+ pages_per_brw = OSC_BRW_MAX_IOV;
else
pages_per_brw = page_count;
if (cmd & OBD_BRW_WRITE)
- rc = osc_brw_write(conn, md, pages_per_brw, pga, set, oti);
+ rc = sanosc_brw_write(conn, md, pages_per_brw,
+ pga, set);
else
- rc = osc_brw_read(conn, md, pages_per_brw, pga, set);
+ rc = sanosc_brw_read(conn, md, pages_per_brw, pga, set);
if (rc != 0)
RETURN(rc);
}
RETURN(0);
}
+#endif
static int osc_enqueue(struct lustre_handle *connh, struct lov_stripe_md *lsm,
struct lustre_handle *parent_lock,
GOTO(out, err = -EINVAL);
}
- if (data->ioc_inllen2 < sizeof(uuid.uuid)) {
+ if (data->ioc_inllen2 < sizeof(uuid)) {
OBD_FREE(buf, len);
GOTO(out, err = -EINVAL);
}
desc->ld_default_stripe_size = 0;
desc->ld_default_stripe_offset = 0;
desc->ld_pattern = 0;
- memcpy(desc->ld_uuid.uuid, obddev->obd_uuid.uuid, sizeof(uuid.uuid));
+ memcpy(&desc->ld_uuid, &obddev->obd_uuid, sizeof(uuid));
- memcpy(data->ioc_inlbuf2, obddev->obd_uuid.uuid,
- sizeof(uuid.uuid));
+ memcpy(data->ioc_inlbuf2, &obddev->obd_uuid, sizeof(uuid));
err = copy_to_user((void *)uarg, buf, len);
if (err)
fakeconn.addr = (__u64)(unsigned long)exp;
fakeconn.cookie = exp->exp_cookie;
- ioc_data.ioc_inlbuf1 = &imp->imp_obd->u.cli.cl_target_uuid;
+ ioc_data.ioc_inlbuf1 =
+ (char *)&imp->imp_obd->u.cli.cl_target_uuid;
ioc_data.ioc_offset = active;
rc = obd_iocontrol(IOC_LOV_SET_OSC_ACTIVE, &fakeconn,
sizeof ioc_data, &ioc_data, NULL);
- if (rc) {
- CERROR("disabling %s on LOV %p/%s: %d\n",
+ if (rc)
+ CERROR("error disabling %s on LOV %p/%s: %d\n",
imp->imp_obd->u.cli.cl_target_uuid.uuid,
notify_obd, notify_obd->obd_uuid.uuid, rc);
- }
} else {
CDEBUG(D_HA, "No exports for obd %p/%s, can't notify about "
"%p\n", notify_obd, notify_obd->obd_uuid.uuid,
{
int rc;
unsigned long flags;
+ int msg_flags;
struct ptlrpc_request *req;
+ struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
ENTRY;
+ CDEBUG(D_HA, "%s: entering phase: %d\n",
+ imp->imp_obd->obd_name, phase);
switch(phase) {
case PTLRPC_RECOVD_PHASE_PREPARE: {
- struct ldlm_namespace *ns = imp->imp_obd->obd_namespace;
- ldlm_namespace_cleanup(ns, 1 /* no network ops */);
- ptlrpc_abort_inflight(imp, 0);
- set_osc_active(imp, 0 /* inactive */);
+ if (imp->imp_flags & IMP_REPLAYABLE) {
+ CDEBUG(D_HA, "failover OST\n");
+ /* If we're a failover OSC/OST, just cancel unused
+ * locks to simplify lock replay.
+ */
+ ldlm_cli_cancel_unused(ns, NULL, LDLM_FL_LOCAL_ONLY);
+ } else {
+ CDEBUG(D_HA, "non-failover OST\n");
+ /* Non-failover OSTs (LLNL scenario) disable the OSC
+ * and invalidate local state.
+ */
+ ldlm_namespace_cleanup(ns, 1 /* no network ops */);
+ ptlrpc_abort_inflight(imp, 0);
+ set_osc_active(imp, 0 /* inactive */);
+ }
RETURN(0);
}
- case PTLRPC_RECOVD_PHASE_RECOVER:
+ case PTLRPC_RECOVD_PHASE_RECOVER: {
+ reconnect:
imp->imp_flags &= ~IMP_INVALID;
rc = ptlrpc_reconnect_import(imp, OST_CONNECT, &req);
- ptlrpc_req_finished(req);
+
+ msg_flags = req->rq_repmsg
+ ? lustre_msg_get_op_flags(req->rq_repmsg)
+ : 0;
+
+ if (rc == -EBUSY && (msg_flags & MSG_CONNECT_RECOVERING))
+ CERROR("reconnect denied by recovery; should retry\n");
+
if (rc) {
+ if (phase != PTLRPC_RECOVD_PHASE_NOTCONN) {
+ CERROR("can't reconnect, invalidating\n");
+ ldlm_namespace_cleanup(ns, 1);
+ ptlrpc_abort_inflight(imp, 0);
+ }
imp->imp_flags |= IMP_INVALID;
+ ptlrpc_req_finished(req);
RETURN(rc);
}
+ if (msg_flags & MSG_CONNECT_RECOVERING) {
+ /* Replay if they want it. */
+ DEBUG_REQ(D_HA, req, "OST wants replay");
+ rc = ptlrpc_replay(imp);
+ if (rc)
+ GOTO(check_rc, rc);
+
+ rc = ldlm_replay_locks(imp);
+ if (rc)
+ GOTO(check_rc, rc);
+
+ rc = signal_completed_replay(imp);
+ if (rc)
+ GOTO(check_rc, rc);
+ } else if (msg_flags & MSG_CONNECT_RECONNECT) {
+ DEBUG_REQ(D_HA, req, "reconnecting to MDS\n");
+ /* Nothing else to do here. */
+ } else {
+ DEBUG_REQ(D_HA, req, "evicted: invalidating\n");
+ /* Otherwise, clean everything up. */
+ ldlm_namespace_cleanup(ns, 1);
+ ptlrpc_abort_inflight(imp, 0);
+ }
+
+ ptlrpc_req_finished(req);
+
spin_lock_irqsave(&imp->imp_lock, flags);
imp->imp_level = LUSTRE_CONN_FULL;
+ imp->imp_flags &= ~IMP_INVALID;
spin_unlock_irqrestore(&imp->imp_lock, flags);
/* Is this the right place? Should we do this in _PREPARE
*/
ptlrpc_wake_delayed(imp);
+ rc = ptlrpc_resend(imp);
+ if (rc)
+ GOTO(check_rc, rc);
+
set_osc_active(imp, 1 /* active */);
RETURN(0);
+ check_rc:
+ /* If we get disconnected in the middle, recovery has probably
+ * failed. Reconnect and find out.
+ */
+ if (rc == -ENOTCONN)
+ goto reconnect;
+ RETURN(rc);
+ }
case PTLRPC_RECOVD_PHASE_NOTCONN:
osc_recover(imp, PTLRPC_RECOVD_PHASE_PREPARE);
RETURN(osc_recover(imp, PTLRPC_RECOVD_PHASE_RECOVER));
o_iocontrol: osc_iocontrol
};
-static int __init osc_init(void)
+struct obd_ops sanosc_obd_ops = {
+ o_owner: THIS_MODULE,
+ o_attach: osc_attach,
+ o_detach: osc_detach,
+ o_cleanup: client_obd_cleanup,
+ o_connect: osc_connect,
+ o_disconnect: client_obd_disconnect,
+ o_statfs: osc_statfs,
+ o_packmd: osc_packmd,
+ o_unpackmd: osc_unpackmd,
+ o_create: osc_create,
+ o_destroy: osc_destroy,
+ o_getattr: osc_getattr,
+ o_setattr: osc_setattr,
+ o_open: osc_open,
+ o_close: osc_close,
+#ifdef __KERNEL__
+ o_setup: client_sanobd_setup,
+ o_brw: sanosc_brw,
+#endif
+ o_punch: osc_punch,
+ o_enqueue: osc_enqueue,
+ o_cancel: osc_cancel,
+ o_cancel_unused: osc_cancel_unused,
+ o_iocontrol: osc_iocontrol,
+};
+
+int __init osc_init(void)
{
struct lprocfs_static_vars lvars;
+ int rc;
+ ENTRY;
+
+ LASSERT(sizeof(struct osc_obdo_data) <= FD_OSTDATA_SIZE);
lprocfs_init_vars(&lvars);
- RETURN(class_register_type(&osc_obd_ops, lvars.module_vars,
- LUSTRE_OSC_NAME));
+
+ rc = class_register_type(&osc_obd_ops, lvars.module_vars,
+ LUSTRE_OSC_NAME);
+ if (rc)
+ RETURN(rc);
+
+ rc = class_register_type(&sanosc_obd_ops, lvars.module_vars,
+ LUSTRE_SANOSC_NAME);
+ if (rc)
+ class_unregister_type(LUSTRE_OSC_NAME);
+
+ RETURN(rc);
}
static void __exit osc_exit(void)
{
+ class_unregister_type(LUSTRE_SANOSC_NAME);
class_unregister_type(LUSTRE_OSC_NAME);
}
+#ifdef __KERNEL__
MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
MODULE_LICENSE("GPL");
module_init(osc_init);
module_exit(osc_exit);
+#endif