Whamcloud - gitweb
Merge b_md to HEAD for 0.5.19 release.
[fs/lustre-release.git] / lustre / mdc / mdc_request.c
index c856d10..a97cfb5 100644 (file)
@@ -37,24 +37,25 @@ extern int mds_queue_req(struct ptlrpc_request *);
 extern struct lprocfs_vars status_var_nm_1[];
 extern struct lprocfs_vars status_class_var[];
 
-/* should become mdc_getinfo() */
-int mdc_getstatus(struct lustre_handle *conn, struct ll_fid *rootfid)
+/* Helper that implements most of mdc_getstatus and signal_completed_replay. */
+static int send_getstatus(struct obd_import *imp, struct ll_fid *rootfid,
+                          int level, int msg_flags)
 {
         struct ptlrpc_request *req;
         struct mds_body *body;
         int rc, size = sizeof(*body);
         ENTRY;
 
-        req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_GETSTATUS, 1, &size,
-                              NULL);
+        req = ptlrpc_prep_req(imp, MDS_GETSTATUS, 1, &size, NULL);
         if (!req)
                 GOTO(out, rc = -ENOMEM);
 
         body = lustre_msg_buf(req->rq_reqmsg, 0);
-        req->rq_level = LUSTRE_CONN_CON;
+        req->rq_level = level;
         req->rq_replen = lustre_msg_size(1, &size);
-
+        
         mds_pack_req_body(req);
+        req->rq_reqmsg->flags |= msg_flags;
         rc = ptlrpc_queue_wait(req);
 
         if (!rc) {
@@ -74,6 +75,13 @@ int mdc_getstatus(struct lustre_handle *conn, struct ll_fid *rootfid)
         return rc;
 }
 
+/* should become mdc_getinfo() */
+int mdc_getstatus(struct lustre_handle *conn, struct ll_fid *rootfid)
+{
+        return send_getstatus(class_conn2cliimp(conn), rootfid, LUSTRE_CONN_CON,
+                              0);
+}
+
 int mdc_getlovinfo(struct obd_device *obd, struct lustre_handle *mdc_connh,
                    struct ptlrpc_request **request)
 {
@@ -104,9 +112,8 @@ int mdc_getlovinfo(struct obd_device *obd, struct lustre_handle *mdc_connh,
         RETURN(rc);
 }
 
-
 int mdc_getattr(struct lustre_handle *conn,
-                obd_id ino, int type, unsigned long valid, size_t ea_size,
+                obd_id ino, int type, unsigned long valid, unsigned int ea_size,
                 struct ptlrpc_request **request)
 {
         struct ptlrpc_request *req;
@@ -130,7 +137,7 @@ int mdc_getattr(struct lustre_handle *conn,
                 size[bufcount] = ea_size;
                 bufcount++;
                 body->size = ea_size;
-                CDEBUG(D_INODE, "reserving %d bytes for MD/symlink in packet\n",
+                CDEBUG(D_INODE, "reserved %u bytes for MD/symlink in packet\n",
                        ea_size);
         }
         req->rq_replen = lustre_msg_size(bufcount, size);
@@ -150,6 +157,50 @@ int mdc_getattr(struct lustre_handle *conn,
         return rc;
 }
 
+int mdc_getattr_name(struct lustre_handle *conn, struct inode *parent,
+                     char *filename, int namelen, unsigned long valid,
+                     unsigned int ea_size, struct ptlrpc_request **request)
+{
+        struct ptlrpc_request *req;
+        struct mds_body *body;
+        int rc, size[2] = {sizeof(*body), namelen}, bufcount = 1;
+        ENTRY;
+
+        req = ptlrpc_prep_req(class_conn2cliimp(conn), MDS_GETATTR_NAME, 2,
+                              size, NULL);
+        if (!req)
+                GOTO(out, rc = -ENOMEM);
+
+        body = lustre_msg_buf(req->rq_reqmsg, 0);
+        ll_inode2fid(&body->fid1, parent);
+        body->valid = valid;
+        memcpy(lustre_msg_buf(req->rq_reqmsg, 1), filename, namelen);
+
+        if (ea_size) {
+                size[1] = ea_size;
+                bufcount++;
+                body->size = ea_size;
+                CDEBUG(D_INODE, "reserved %u bytes for MD/symlink in packet\n",
+                       ea_size);
+                valid |= OBD_MD_FLEASIZE;
+        }
+
+        req->rq_replen = lustre_msg_size(bufcount, size);
+        mds_pack_req_body(req);
+
+        rc = ptlrpc_queue_wait(req);
+
+        if (!rc) {
+                body = lustre_msg_buf(req->rq_repmsg, 0);
+                mds_unpack_body(body);
+        }
+
+        EXIT;
+ out:
+        *request = req;
+        return rc;
+}
+
 void d_delete_aliases(struct inode *inode)
 {
         struct dentry *dentry = NULL;
@@ -187,15 +238,19 @@ static int mdc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                 break;
         case LDLM_CB_CANCELING: {
                 /* Invalidate all dentries associated with this inode */
-                struct inode *inode = data;
-
-#warning "FIXME: what tells us that 'inode' is valid at all?"
-                if (inode->i_state & I_FREEING)
-                        break;
+                struct inode *inode;
 
-                LASSERT(inode != NULL);
+                LASSERT(data != NULL);
                 LASSERT(data_len == sizeof(*inode));
 
+                /* XXX what tells us that 'data' is a valid inode at all?
+                 *     we should probably validate the lock handle first?
+                 */
+                inode = igrab(data);
+
+                if (inode == NULL)      /* inode->i_state & I_FREEING */
+                        break;
+
                 if (S_ISDIR(inode->i_mode)) {
                         CDEBUG(D_INODE, "invalidating inode %lu\n",
                                inode->i_ino);
@@ -203,12 +258,10 @@ static int mdc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
                         ll_invalidate_inode_pages(inode);
                 }
 
-                if (inode != inode->i_sb->s_root->d_inode) {
-                        /* XXX should this igrab move up 12 lines? */
-                        LASSERT(igrab(inode) == inode);
+                if (inode != inode->i_sb->s_root->d_inode)
                         d_delete_aliases(inode);
-                        iput(inode);
-                }
+
+                iput(inode);
                 break;
         }
         default:
@@ -225,11 +278,16 @@ void mdc_store_inode_generation(struct ptlrpc_request *req, int reqoff,
         struct mds_rec_create *rec = lustre_msg_buf(req->rq_reqmsg, reqoff);
         struct mds_body *body = lustre_msg_buf(req->rq_repmsg, repoff);
 
-        DEBUG_REQ(D_HA, req, "storing generation %x for ino "LPD64,
-                  body->fid1.generation, body->fid1.id);
         memcpy(&rec->cr_replayfid, &body->fid1, sizeof rec->cr_replayfid);
+        DEBUG_REQ(D_HA, req, "storing generation %x for ino "LPD64,
+                  rec->cr_replayfid.generation, rec->cr_replayfid.id);
 }
 
+/* We always reserve enough space in the reply packet for a stripe MD, because
+ * we don't know in advance the file type.
+ *
+ * XXX we could get that from ext2_dir_entry_2 file_type
+ */
 int mdc_enqueue(struct lustre_handle *conn, int lock_type,
                 struct lookup_intent *it, int lock_mode, struct inode *dir,
                 struct dentry *de, struct lustre_handle *lockh,
@@ -408,7 +466,8 @@ int mdc_enqueue(struct lustre_handle *conn, int lock_type,
                                     &lockh2)) {
                         /* We already have a lock; cancel the old one */
                         ldlm_lock_decref(lockh, lock_mode);
-                        ldlm_cli_cancel(lockh);
+                        /* FIXME: bug 563 */
+                        //ldlm_cli_cancel(lockh);
                         memcpy(lockh, &lockh2, sizeof(lockh2));
                 }
                 LDLM_LOCK_PUT(lock);
@@ -459,6 +518,13 @@ static void mdc_replay_open(struct ptlrpc_request *req)
         memcpy(saved->fh, &body->handle, sizeof(body->handle));
 }
 
+/* If lmm is non-NULL and lmm_size is non-zero, the stripe MD is stored on
+ * the MDS.  Otherwise, we have already read a copy from the MDS (probably
+ * during mdc_enqueue() and we do not need to send it to the MDS again.
+ *
+ * In the future (when we support the non-intent case) we need to be able
+ * to read the stripe MD from the MDS here (need to fix mds_open() too).
+ */
 int mdc_open(struct lustre_handle *conn, obd_id ino, int type, int flags,
              struct lov_mds_md *lmm, int lmm_size, struct lustre_handle *fh,
              struct ptlrpc_request **request)
@@ -469,9 +535,9 @@ int mdc_open(struct lustre_handle *conn, obd_id ino, int type, int flags,
         struct ptlrpc_request *req;
         ENTRY;
 
-        if (lmm && lmm_size) {
+        if (lmm_size) {
                 bufcount = 3;
-                size[2] = size[1]; /* shuffle the spare data along */
+                size[2] = size[1]; /* shuffle the replay data along */
                 size[1] = lmm_size;
         }
 
@@ -487,12 +553,14 @@ int mdc_open(struct lustre_handle *conn, obd_id ino, int type, int flags,
         body->flags = HTON__u32(flags);
         memcpy(&body->handle, fh, sizeof(body->handle));
 
-        if (lmm && lmm_size) {
-                CDEBUG(D_INODE, "sending %u bytes MD for ino "LPU64"\n",
-                       lmm_size, ino);
-                lustre_msg_set_op_flags(req->rq_reqmsg, MDS_OPEN_HAS_EA);
-                memcpy(lustre_msg_buf(req->rq_reqmsg, 1), lmm, lmm_size);
+        if (lmm_size) {
                 body->flags |= HTON__u32(OBD_MD_FLEASIZE);
+                if (lmm) {
+                        CDEBUG(D_INODE, "sending %u bytes MD for ino "LPU64"\n",
+                               lmm_size, ino);
+                        lustre_msg_set_op_flags(req->rq_reqmsg,MDS_OPEN_HAS_EA);
+                        memcpy(lustre_msg_buf(req->rq_reqmsg,1), lmm, lmm_size);
+                }
         }
 
         req->rq_replen = lustre_msg_size(1, size);
@@ -502,12 +570,12 @@ int mdc_open(struct lustre_handle *conn, obd_id ino, int type, int flags,
                 body = lustre_msg_buf(req->rq_repmsg, 0);
                 mds_unpack_body(body);
                 memcpy(fh, &body->handle, sizeof(*fh));
-        }
 
-        /* If open is replayed, we need to fix up the fh. */
-        req->rq_replay_cb = mdc_replay_open;
-        replay_data = lustre_msg_buf(req->rq_reqmsg, lmm ? 2 : 1);
-        replay_data->fh = fh;
+                /* If open is replayed, we need to fix up the fh. */
+                req->rq_replay_cb = mdc_replay_open;
+                replay_data = lustre_msg_buf(req->rq_reqmsg, lmm ? 2 : 1);
+                replay_data->fh = fh;
+        }
 
         EXIT;
  out:
@@ -635,9 +703,19 @@ static int mdc_detach(struct obd_device *dev)
         return lprocfs_dereg_obd(dev);
 }
 
+/* Send a mostly-dummy GETSTATUS request and indicate that we're done replay. */
+static int signal_completed_replay(struct obd_import *imp)
+{
+        struct ll_fid fid;
+        
+        return send_getstatus(imp, &fid, LUSTRE_CONN_RECOVD, MSG_LAST_REPLAY);
+}
+
 static int mdc_recover(struct obd_import *imp, int phase)
 {
         int rc;
+        unsigned long flags;
+        struct ptlrpc_request *req;
         ENTRY;
 
         switch(phase) {
@@ -647,13 +725,30 @@ static int mdc_recover(struct obd_import *imp, int phase)
                 RETURN(0);
             case PTLRPC_RECOVD_PHASE_RECOVER:
         reconnect:
-                rc = ptlrpc_reconnect_import(imp, MDS_CONNECT);
+                rc = ptlrpc_reconnect_import(imp, MDS_CONNECT, &req);
+
+                /* We were still connected, just go about our business. */
                 if (rc == EALREADY)
-                        RETURN(ptlrpc_replay(imp, 0));
-                if (rc)
+                        GOTO(skip_replay, rc);
+
+                if (rc) {
+                        ptlrpc_req_finished(req);
                         RETURN(rc);
+                }
+                
+                /* We can't replay, which might be a problem. */
+                if (!(lustre_msg_get_flags(req->rq_repmsg) &
+                      MSG_REPLAY_IN_PROGRESS)) {
+                        if (phase != PTLRPC_RECOVD_PHASE_NOTCONN) {
+                             CERROR("can't replay, invalidating\n");
+                             ldlm_namespace_cleanup(imp->imp_obd->obd_namespace,
+                                                    1);
+                             ptlrpc_abort_inflight(imp);
+                        }
+                        goto skip_replay;
+                }
 
-                rc = ptlrpc_replay(imp, 0 /* no last flag*/);
+                rc = ptlrpc_replay(imp);
                 if (rc)
                         RETURN(rc);
 
@@ -661,9 +756,16 @@ static int mdc_recover(struct obd_import *imp, int phase)
                 if (rc)
                         RETURN(rc);
 
-                spin_lock(&imp->imp_lock);
+                rc = signal_completed_replay(imp);
+                if (rc)
+                        RETURN(rc);
+
+        skip_replay:
+                ptlrpc_req_finished(req);
+                spin_lock_irqsave(&imp->imp_lock, flags);
                 imp->imp_level = LUSTRE_CONN_FULL;
-                spin_unlock(&imp->imp_lock);
+                imp->imp_flags &= ~IMP_INVALID;
+                spin_unlock_irqrestore(&imp->imp_lock, flags);
 
                 ptlrpc_wake_delayed(imp);
 
@@ -693,13 +795,14 @@ static int mdc_connect(struct lustre_handle *conn, struct obd_device *obd,
 }
 
 struct obd_ops mdc_obd_ops = {
-        o_attach: mdc_attach,
-        o_detach: mdc_detach,
-        o_setup:   client_obd_setup,
-        o_cleanup: client_obd_cleanup,
-        o_connect: mdc_connect,
-        o_disconnect: client_obd_disconnect,
-        o_statfs: mdc_statfs,
+        o_owner:       THIS_MODULE,
+        o_attach:      mdc_attach,
+        o_detach:      mdc_detach,
+        o_setup:       client_obd_setup,
+        o_cleanup:     client_obd_cleanup,
+        o_connect:     mdc_connect,
+        o_disconnect:  client_obd_disconnect,
+        o_statfs:      mdc_statfs
 };
 
 static int __init ptlrpc_request_init(void)
@@ -723,6 +826,7 @@ EXPORT_SYMBOL(mdc_getlovinfo);
 EXPORT_SYMBOL(mdc_enqueue);
 EXPORT_SYMBOL(mdc_cancel_unused);
 EXPORT_SYMBOL(mdc_getattr);
+EXPORT_SYMBOL(mdc_getattr_name);
 EXPORT_SYMBOL(mdc_create);
 EXPORT_SYMBOL(mdc_unlink);
 EXPORT_SYMBOL(mdc_rename);