Whamcloud - gitweb
- landing of b_hd_cleanup_merge to HEAD.
[fs/lustre-release.git] / lustre / obdfilter / filter.c
index 4faa1ef..ab194ce 100644 (file)
 #include <linux/dcache.h>
 #include <linux/init.h>
 #include <linux/version.h>
+#include <linux/jbd.h>
+#include <linux/ext3_fs.h>
 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
 # include <linux/mount.h>
 # include <linux/buffer_head.h>
+# include <linux/bio.h>
 #endif
 
 #include <linux/obd_class.h>
@@ -108,6 +111,7 @@ int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti,
                                 cpu_to_le64(last_rcvd);
                 spin_unlock(&filter->fo_translock);
         }
+        
         fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd);
 
         /* could get xid from oti, if it's ever needed */
@@ -480,9 +484,10 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
                  * need to be set up like real exports as filter_connect() does.
                  */
                 exp = class_new_export(obd);
-                CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
-                       " srv lr: "LPU64"\n", fcd->fcd_uuid, cl_idx,
-                       last_rcvd, le64_to_cpu(fsd->fsd_last_transno));
+                CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64" "
+                       "srv lr: "LPU64" fcd_group %d \n", fcd->fcd_uuid, cl_idx,
+                       last_rcvd, le64_to_cpu(fsd->fsd_last_transno), 
+                       le32_to_cpu(fcd->fcd_group));
                 if (exp == NULL)
                         GOTO(err_client, rc = -ENOMEM);
 
@@ -490,6 +495,7 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
                        sizeof exp->exp_client_uuid.uuid);
                 fed = &exp->exp_filter_data;
                 fed->fed_fcd = fcd;
+                fed->fed_group = le32_to_cpu(fcd->fcd_group);
                 filter_client_add(obd, filter, fed, cl_idx);
                 /* create helper if export init gets more complex */
                 spin_lock_init(&fed->fed_lock);
@@ -511,8 +517,9 @@ static int filter_init_server_data(struct obd_device *obd, struct file * filp)
         obd->obd_last_committed = le64_to_cpu(fsd->fsd_last_transno);
 
         if (obd->obd_recoverable_clients) {
-                CWARN("RECOVERY: %d recoverable clients, last_rcvd "
-                      LPU64"\n", obd->obd_recoverable_clients,
+                CWARN("RECOVERY: service %s, %d recoverable clients, "
+                      "last_transno "LPU64"\n", obd->obd_name,
+                      obd->obd_recoverable_clients,
                       le64_to_cpu(fsd->fsd_last_transno));
                 obd->obd_next_recovery_transno = obd->obd_last_committed + 1;
                 target_start_recovery_thread(obd, ost_handle);
@@ -527,8 +534,9 @@ out:
 
         /* save it, so mount count and last_transno is current */
         rc = filter_update_server_data(obd, filp, filter->fo_fsd, 1);
-
-        RETURN(rc);
+        if (rc)
+                GOTO(err_client, rc);
+        RETURN(0);
 
 err_client:
         class_disconnect_exports(obd, 0);
@@ -1173,10 +1181,17 @@ struct dentry *filter_fid2dentry(struct obd_device *obd,
         if (dir_dentry == NULL)
                 filter_parent_unlock(dparent, lock);
         if (IS_ERR(dchild)) {
-                CERROR("child lookup error %ld\n", PTR_ERR(dchild));
+                CERROR("%s: child lookup error %ld\n", obd->obd_name,
+                       PTR_ERR(dchild));
                 RETURN(dchild);
         }
 
+        if (dchild->d_inode != NULL && is_bad_inode(dchild->d_inode)) {
+                CERROR("%s: got bad inode "LPU64"\n", obd->obd_name, id);
+                f_dput(dchild);
+                RETURN(ERR_PTR(-ENOENT));
+        }
+
         CDEBUG(D_INODE, "got child objid %s: %p, count = %d\n",
                name, dchild, atomic_read(&dchild->d_count));
 
@@ -1244,10 +1259,10 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
         struct ldlm_resource *res = lock->l_resource;
         ldlm_processing_policy policy;
         struct ost_lvb *res_lvb, *reply_lvb;
+        struct ldlm_reply *rep;
         struct list_head *tmp;
         ldlm_error_t err;
-        int tmpflags = 0, rc, repsize[2] = {sizeof(struct ldlm_reply),
-                                            sizeof(struct ost_lvb) };
+        int tmpflags = 0, rc, repsize[2] = {sizeof(*rep), sizeof(*reply_lvb)};
         ENTRY;
 
         policy = ldlm_get_processing_policy(res);
@@ -1258,6 +1273,9 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
         if (rc)
                 RETURN(req->rq_status = rc);
 
+        rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*rep));
+        LASSERT(rep != NULL);
+
         reply_lvb = lustre_msg_buf(req->rq_repmsg, 1, sizeof(*reply_lvb));
         LASSERT(reply_lvb != NULL);
 
@@ -1299,8 +1317,7 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
         down(&res->lr_lvb_sem);
         res_lvb = res->lr_lvb_data;
         LASSERT(res_lvb != NULL);
-        reply_lvb->lvb_size = res_lvb->lvb_size;
-        reply_lvb->lvb_blocks = res_lvb->lvb_blocks;
+        *reply_lvb = *res_lvb;
         up(&res->lr_lvb_sem);
 
         list_for_each(tmp, &res->lr_granted) {
@@ -1331,7 +1348,14 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
         if (l == NULL)
                 RETURN(ELDLM_LOCK_ABORTED);
 
-        LASSERT(l->l_glimpse_ast != NULL);
+        if (l->l_glimpse_ast == NULL) {
+                /* We are racing with unlink(); just return -ENOENT */
+                rep->lock_policy_res1 = -ENOENT;
+                goto out;
+        }
+
+        LASSERTF(l->l_glimpse_ast != NULL, "l == %p", l);
+
         rc = l->l_glimpse_ast(l, NULL); /* this will update the LVB */
         if (rc != 0 && res->lr_namespace->ns_lvbo &&
             res->lr_namespace->ns_lvbo->lvbo_update) {
@@ -1339,10 +1363,9 @@ static int filter_intent_policy(struct ldlm_namespace *ns,
         }
 
         down(&res->lr_lvb_sem);
-        reply_lvb->lvb_size = res_lvb->lvb_size;
-        reply_lvb->lvb_blocks = res_lvb->lvb_blocks;
+        *reply_lvb = *res_lvb;
         up(&res->lr_lvb_sem);
-
+out:
         LDLM_LOCK_PUT(l);
 
         RETURN(ELDLM_LOCK_ABORTED);
@@ -1403,13 +1426,13 @@ static int filter_post_fs_setup(struct obd_device *obd)
 }
 
 /* mount the file system (secretly) */
-int filter_common_setup(struct obd_device *obd, obd_count len,
-                        void *buf, char *option)
+int filter_common_setup(struct obd_device *obd, obd_count len, void *buf,
+                        char *option)
 {
         struct lustre_cfg* lcfg = buf;
         struct filter_obd *filter = &obd->u.filter;
         struct vfsmount *mnt;
-        char name[32] = "CATLIST";
+        char ns_name[48];
         int rc = 0, i;
         ENTRY;
 
@@ -1483,8 +1506,9 @@ int filter_common_setup(struct obd_device *obd, obd_count len,
         INIT_LIST_HEAD(&filter->fo_llog_list);
         spin_lock_init(&filter->fo_llog_list_lock);
 
-        obd->obd_namespace = ldlm_namespace_new("filter-tgt",
-                                                LDLM_NAMESPACE_SERVER);
+        sprintf(ns_name, "filter-%s", obd->obd_uuid.uuid);
+        obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
+
         if (obd->obd_namespace == NULL)
                 GOTO(err_post, rc = -ENOMEM);
         obd->obd_namespace->ns_lvbp = obd;
@@ -1494,7 +1518,7 @@ int filter_common_setup(struct obd_device *obd, obd_count len,
         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
                            "filter_ldlm_cb_client", &obd->obd_ldlm_client);
 
-        rc = obd_llog_cat_initialize(obd, &obd->obd_llogs, 1, name);
+        rc = obd_llog_cat_initialize(obd, &obd->obd_llogs, 1, CATLIST);
         if (rc) {
                 CERROR("failed to setup llogging subsystems\n");
                 GOTO(err_post, rc);
@@ -1575,6 +1599,8 @@ static int filter_cleanup(struct obd_device *obd, int flags)
                 }
         }
 
+        target_cleanup_recovery(obd);
+        
         ldlm_namespace_free(obd->obd_namespace, flags & OBD_OPT_FORCE);
 
         if (filter->fo_sb == NULL)
@@ -1659,6 +1685,8 @@ static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
 
         spin_lock_init(&fed->fed_lock);
 
+       /* connection from MDS */
+        group = connect_flags;
         if (obd->obd_replayable) {
                 OBD_ALLOC(fcd, sizeof(*fcd));
                 if (!fcd) {
@@ -1668,19 +1696,16 @@ static int filter_connect(struct lustre_handle *conn, struct obd_device *obd,
 
                 memcpy(fcd->fcd_uuid, cluuid, sizeof(fcd->fcd_uuid));
                 fed->fed_fcd = fcd;
-
+                fed->fed_fcd->fcd_group = group;
                 rc = filter_client_add(obd, filter, fed, -1);
                 if (rc)
                         GOTO(cleanup, rc);
         }
-
-        if (connect_flags == 0)
-                GOTO(cleanup, rc);
-
-        /* connection from MDS */
-        group = connect_flags;
         CWARN("%s: Received MDS connection ("LPX64"); group %d\n",
               obd->obd_name, exp->exp_handle.h_cookie, group);
+        if (group == 0)
+                GOTO(cleanup, rc);
         
         if (fed->fed_group != 0 && fed->fed_group != group) {
                 char str[PTL_NALFMT_SIZE];
@@ -2529,7 +2554,7 @@ static int filter_destroy(struct obd_export *exp, struct obdo *oa,
                 goto acquire_locks;
         }
 
-        handle = fsfilt_start_log(obd, dparent->d_inode, FSFILT_OP_UNLINK, oti, 1);
+        handle = fsfilt_start_log(obd, dparent->d_inode,FSFILT_OP_UNLINK,oti,1);
         if (IS_ERR(handle))
                 GOTO(cleanup, rc = PTR_ERR(handle));