extern struct lprocfs_vars status_var_nm_1[];
extern struct lprocfs_vars status_class_var[];
-/* should become mdc_getinfo() */
-int mdc_getstatus(struct lustre_handle *conn, struct ll_fid *rootfid)
+/* Helper that implements most of mdc_getstatus and signal_completed_replay. */
+static int send_getstatus(struct obd_import *imp, struct ll_fid *rootfid,
+ int level, int msg_flags)
{
struct ptlrpc_request *req;
struct mds_body *body;
int rc, size = sizeof(*body);
ENTRY;
- req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_GETSTATUS, 1, &size,
- NULL);
+ req = ptlrpc_prep_req(imp, MDS_GETSTATUS, 1, &size, NULL);
if (!req)
GOTO(out, rc = -ENOMEM);
body = lustre_msg_buf(req->rq_reqmsg, 0);
- req->rq_level = LUSTRE_CONN_CON;
+ req->rq_level = level;
req->rq_replen = lustre_msg_size(1, &size);
-
+
mds_pack_req_body(req);
+ req->rq_reqmsg->flags |= msg_flags;
rc = ptlrpc_queue_wait(req);
if (!rc) {
return rc;
}
+/* should become mdc_getinfo() */
+int mdc_getstatus(struct lustre_handle *conn, struct ll_fid *rootfid)
+{
+ return send_getstatus(class_conn2cliimp(conn), rootfid, LUSTRE_CONN_CON,
+ 0);
+}
+
int mdc_getlovinfo(struct obd_device *obd, struct lustre_handle *mdc_connh,
struct ptlrpc_request **request)
{
RETURN(rc);
}
-
int mdc_getattr(struct lustre_handle *conn,
- obd_id ino, int type, unsigned long valid, size_t ea_size,
+ obd_id ino, int type, unsigned long valid, unsigned int ea_size,
struct ptlrpc_request **request)
{
struct ptlrpc_request *req;
size[bufcount] = ea_size;
bufcount++;
body->size = ea_size;
- CDEBUG(D_INODE, "reserving %d bytes for MD/symlink in packet\n",
+ CDEBUG(D_INODE, "reserved %u bytes for MD/symlink in packet\n",
ea_size);
}
req->rq_replen = lustre_msg_size(bufcount, size);
return rc;
}
+int mdc_getattr_name(struct lustre_handle *conn, struct inode *parent,
+ char *filename, int namelen, unsigned long valid,
+ unsigned int ea_size, struct ptlrpc_request **request)
+{
+ struct ptlrpc_request *req;
+ struct mds_body *body;
+ int rc, size[2] = {sizeof(*body), namelen}, bufcount = 1;
+ ENTRY;
+
+ req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_GETATTR_NAME, 2,
+ size, NULL);
+ if (!req)
+ GOTO(out, rc = -ENOMEM);
+
+ body = lustre_msg_buf(req->rq_reqmsg, 0);
+ ll_inode2fid(&body->fid1, parent);
+ body->valid = valid;
+ memcpy(lustre_msg_buf(req->rq_reqmsg, 1), filename, namelen);
+
+ if (ea_size) {
+ size[1] = ea_size;
+ bufcount++;
+ body->size = ea_size;
+ CDEBUG(D_INODE, "reserved %u bytes for MD/symlink in packet\n",
+ ea_size);
+ valid |= OBD_MD_FLEASIZE;
+ }
+
+ req->rq_replen = lustre_msg_size(bufcount, size);
+ mds_pack_req_body(req);
+
+ rc = ptlrpc_queue_wait(req);
+
+ if (!rc) {
+ body = lustre_msg_buf(req->rq_repmsg, 0);
+ mds_unpack_body(body);
+ }
+
+ EXIT;
+ out:
+ *request = req;
+ return rc;
+}
+
void d_delete_aliases(struct inode *inode)
{
struct dentry *dentry = NULL;
break;
case LDLM_CB_CANCELING: {
/* Invalidate all dentries associated with this inode */
- struct inode *inode = data;
-
-#warning "FIXME: what tells us that 'inode' is valid at all?"
- if (inode->i_state & I_FREEING)
- break;
+ struct inode *inode;
- LASSERT(inode != NULL);
+ LASSERT(data != NULL);
LASSERT(data_len == sizeof(*inode));
+ /* XXX what tells us that 'data' is a valid inode at all?
+ * we should probably validate the lock handle first?
+ */
+ inode = igrab(data);
+
+ if (inode == NULL) /* inode->i_state & I_FREEING */
+ break;
+
if (S_ISDIR(inode->i_mode)) {
CDEBUG(D_INODE, "invalidating inode %lu\n",
inode->i_ino);
ll_invalidate_inode_pages(inode);
}
- if (inode != inode->i_sb->s_root->d_inode) {
- /* XXX should this igrab move up 12 lines? */
- LASSERT(igrab(inode) == inode);
+ if (inode != inode->i_sb->s_root->d_inode)
d_delete_aliases(inode);
- iput(inode);
- }
+
+ iput(inode);
break;
}
default:
struct mds_rec_create *rec = lustre_msg_buf(req->rq_reqmsg, reqoff);
struct mds_body *body = lustre_msg_buf(req->rq_repmsg, repoff);
- DEBUG_REQ(D_HA, req, "storing generation %x for ino "LPD64,
- body->fid1.generation, body->fid1.id);
memcpy(&rec->cr_replayfid, &body->fid1, sizeof rec->cr_replayfid);
+ DEBUG_REQ(D_HA, req, "storing generation %x for ino "LPD64,
+ rec->cr_replayfid.generation, rec->cr_replayfid.id);
}
+/* We always reserve enough space in the reply packet for a stripe MD, because
+ * we don't know in advance the file type.
+ *
+ * XXX we could get that from ext2_dir_entry_2 file_type
+ */
int mdc_enqueue(struct lustre_handle *conn, int lock_type,
struct lookup_intent *it, int lock_mode, struct inode *dir,
struct dentry *de, struct lustre_handle *lockh,
&lockh2)) {
/* We already have a lock; cancel the old one */
ldlm_lock_decref(lockh, lock_mode);
- ldlm_cli_cancel(lockh);
+ /* FIXME: bug 563 */
+ //ldlm_cli_cancel(lockh);
memcpy(lockh, &lockh2, sizeof(lockh2));
}
LDLM_LOCK_PUT(lock);
memcpy(saved->fh, &body->handle, sizeof(body->handle));
}
+/* If lmm is non-NULL and lmm_size is non-zero, the stripe MD is stored on
+ * the MDS. Otherwise, we have already read a copy from the MDS (probably
+ * during mdc_enqueue() and we do not need to send it to the MDS again.
+ *
+ * In the future (when we support the non-intent case) we need to be able
+ * to read the stripe MD from the MDS here (need to fix mds_open() too).
+ */
int mdc_open(struct lustre_handle *conn, obd_id ino, int type, int flags,
struct lov_mds_md *lmm, int lmm_size, struct lustre_handle *fh,
struct ptlrpc_request **request)
struct ptlrpc_request *req;
ENTRY;
- if (lmm && lmm_size) {
+ if (lmm_size) {
bufcount = 3;
- size[2] = size[1]; /* shuffle the spare data along */
+ size[2] = size[1]; /* shuffle the replay data along */
size[1] = lmm_size;
}
body->flags = HTON__u32(flags);
memcpy(&body->handle, fh, sizeof(body->handle));
- if (lmm && lmm_size) {
- CDEBUG(D_INODE, "sending %u bytes MD for ino "LPU64"\n",
- lmm_size, ino);
- lustre_msg_set_op_flags(req->rq_reqmsg, MDS_OPEN_HAS_EA);
- memcpy(lustre_msg_buf(req->rq_reqmsg, 1), lmm, lmm_size);
+ if (lmm_size) {
body->flags |= HTON__u32(OBD_MD_FLEASIZE);
+ if (lmm) {
+ CDEBUG(D_INODE, "sending %u bytes MD for ino "LPU64"\n",
+ lmm_size, ino);
+ lustre_msg_set_op_flags(req->rq_reqmsg,MDS_OPEN_HAS_EA);
+ memcpy(lustre_msg_buf(req->rq_reqmsg,1), lmm, lmm_size);
+ }
}
req->rq_replen = lustre_msg_size(1, size);
body = lustre_msg_buf(req->rq_repmsg, 0);
mds_unpack_body(body);
memcpy(fh, &body->handle, sizeof(*fh));
- }
- /* If open is replayed, we need to fix up the fh. */
- req->rq_replay_cb = mdc_replay_open;
- replay_data = lustre_msg_buf(req->rq_reqmsg, lmm ? 2 : 1);
- replay_data->fh = fh;
+ /* If open is replayed, we need to fix up the fh. */
+ req->rq_replay_cb = mdc_replay_open;
+ replay_data = lustre_msg_buf(req->rq_reqmsg, lmm ? 2 : 1);
+ replay_data->fh = fh;
+ }
EXIT;
out:
return lprocfs_dereg_obd(dev);
}
+/* Send a mostly-dummy GETSTATUS request and indicate that we're done replay. */
+static int signal_completed_replay(struct obd_import *imp)
+{
+ struct ll_fid fid;
+
+ return send_getstatus(imp, &fid, LUSTRE_CONN_RECOVD, MSG_LAST_REPLAY);
+}
+
static int mdc_recover(struct obd_import *imp, int phase)
{
int rc;
+ unsigned long flags;
+ struct ptlrpc_request *req;
ENTRY;
switch(phase) {
RETURN(0);
case PTLRPC_RECOVD_PHASE_RECOVER:
reconnect:
- rc = ptlrpc_reconnect_import(imp, MDS_CONNECT);
+ rc = ptlrpc_reconnect_import(imp, MDS_CONNECT, &req);
+
+ /* We were still connected, just go about our business. */
if (rc == EALREADY)
- RETURN(ptlrpc_replay(imp, 0));
- if (rc)
+ GOTO(skip_replay, rc);
+
+ if (rc) {
+ ptlrpc_req_finished(req);
RETURN(rc);
+ }
+
+ /* We can't replay, which might be a problem. */
+ if (!(lustre_msg_get_flags(req->rq_repmsg) &
+ MSG_REPLAY_IN_PROGRESS)) {
+ if (phase != PTLRPC_RECOVD_PHASE_NOTCONN) {
+ CERROR("can't replay, invalidating\n");
+ ldlm_namespace_cleanup(imp->imp_obd->obd_namespace,
+ 1);
+ ptlrpc_abort_inflight(imp);
+ }
+ goto skip_replay;
+ }
- rc = ptlrpc_replay(imp, 0 /* no last flag*/);
+ rc = ptlrpc_replay(imp);
if (rc)
RETURN(rc);
if (rc)
RETURN(rc);
- spin_lock(&imp->imp_lock);
+ rc = signal_completed_replay(imp);
+ if (rc)
+ RETURN(rc);
+
+ skip_replay:
+ ptlrpc_req_finished(req);
+ spin_lock_irqsave(&imp->imp_lock, flags);
imp->imp_level = LUSTRE_CONN_FULL;
- spin_unlock(&imp->imp_lock);
+ imp->imp_flags &= ~IMP_INVALID;
+ spin_unlock_irqrestore(&imp->imp_lock, flags);
ptlrpc_wake_delayed(imp);
}
struct obd_ops mdc_obd_ops = {
- o_attach: mdc_attach,
- o_detach: mdc_detach,
- o_setup: client_obd_setup,
- o_cleanup: client_obd_cleanup,
- o_connect: mdc_connect,
- o_disconnect: client_obd_disconnect,
- o_statfs: mdc_statfs,
+ o_owner: THIS_MODULE,
+ o_attach: mdc_attach,
+ o_detach: mdc_detach,
+ o_setup: client_obd_setup,
+ o_cleanup: client_obd_cleanup,
+ o_connect: mdc_connect,
+ o_disconnect: client_obd_disconnect,
+ o_statfs: mdc_statfs
};
static int __init ptlrpc_request_init(void)
EXPORT_SYMBOL(mdc_enqueue);
EXPORT_SYMBOL(mdc_cancel_unused);
EXPORT_SYMBOL(mdc_getattr);
+EXPORT_SYMBOL(mdc_getattr_name);
EXPORT_SYMBOL(mdc_create);
EXPORT_SYMBOL(mdc_unlink);
EXPORT_SYMBOL(mdc_rename);