Whamcloud - gitweb
- CROW-related fixes from b_hd_mdref
authoralex <alex>
Mon, 4 Jul 2005 06:28:56 +0000 (06:28 +0000)
committeralex <alex>
Mon, 4 Jul 2005 06:28:56 +0000 (06:28 +0000)
lustre/include/linux/lustre_export.h
lustre/include/linux/obd.h
lustre/lov/lov_obd.c
lustre/mds/mds_lov.c
lustre/mds/mds_open.c
lustre/obdfilter/filter.c
lustre/osc/osc_create.c
lustre/osc/osc_request.c

index 8f9b829..1fc3263 100644 (file)
@@ -33,6 +33,8 @@ struct osc_creator {
         spinlock_t              oscc_lock;
         struct obd_device      *oscc_obd;
         int                     oscc_flags;
         spinlock_t              oscc_lock;
         struct obd_device      *oscc_obd;
         int                     oscc_flags;
+        obd_id                  oscc_next_id;
+        wait_queue_head_t       oscc_waitq;
 };
 
 struct ldlm_export_data {
 };
 
 struct ldlm_export_data {
index 23f46e6..104acc0 100644 (file)
@@ -553,8 +553,6 @@ struct niobuf_local {
 /* Don't conflict with on-wire flags OBD_BRW_WRITE, etc */
 #define N_LOCAL_TEMP_PAGE 0x10000000
 
 /* Don't conflict with on-wire flags OBD_BRW_WRITE, etc */
 #define N_LOCAL_TEMP_PAGE 0x10000000
 
-typedef int (*obd_obj_alloc_func_t)(obd_id *objid);
-
 struct obd_trans_info {
         __u64                    oti_transno;
         __u64                   *oti_objid;
 struct obd_trans_info {
         __u64                    oti_transno;
         __u64                   *oti_objid;
@@ -569,7 +567,6 @@ struct obd_trans_info {
         struct llog_cookie      *oti_logcookies;
         int                      oti_numcookies;
         int                      oti_flags;
         struct llog_cookie      *oti_logcookies;
         int                      oti_numcookies;
         int                      oti_flags;
-        obd_obj_alloc_func_t     oti_obj_alloc;
 };
 
 static inline void oti_alloc_cookies(struct obd_trans_info *oti,int num_cookies)
 };
 
 static inline void oti_alloc_cookies(struct obd_trans_info *oti,int num_cookies)
index 152d5d7..3186999 100644 (file)
@@ -763,21 +763,9 @@ lov_create(struct obd_export *exp, struct obdo *src_oa,
                 struct lov_request *req = 
                         list_entry(pos, struct lov_request, rq_link);
 
                 struct lov_request *req = 
                         list_entry(pos, struct lov_request, rq_link);
 
-                obd_id *objids = oti->oti_objid;
-
-                if (oti->oti_obj_alloc) {
-                        __u64 next_id;
-                                
-                        /* 
-                         * allocating new objid. Here it is delegated to caller,
-                         * that is MDS in CROW case.
-                         */
-                        next_id = oti->oti_obj_alloc(&objids[req->rq_idx]);
-                        req->rq_oa->o_id = next_id;
-                } else {
-                        /* and here is default "allocator" */
-                        req->rq_oa->o_id = ++objids[req->rq_idx];
-                }
+                /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
+                rc = obd_create(lov->tgts[req->rq_idx].ltd_exp,
+                                req->rq_oa, NULL, 0, &req->rq_md, oti);
                 lov_update_create_set(set, req, rc);
         }
         rc = lov_fini_create_set(set, ea);
                 lov_update_create_set(set, req, rc);
         }
         rc = lov_fini_create_set(set, ea);
index ced694c..2270fd7 100644 (file)
@@ -323,6 +323,7 @@ int mds_dt_connect(struct obd_device *obd, char *lov_name)
                                        "writing objids file: %d\n", rc);
                 }
         }
                                        "writing objids file: %d\n", rc);
                 }
         }
+
         /*
          * I want to see a callback happen when the OBD moves to a "For General
          * Use" state, and that's when we'll call set_nextid(). The class driver
         /*
          * I want to see a callback happen when the OBD moves to a "For General
          * Use" state, and that's when we'll call set_nextid(). The class driver
@@ -659,11 +660,15 @@ int mds_dt_synchronize(void *data)
         if (rc)
                 GOTO(cleanup, rc);
 
         if (rc)
                 GOTO(cleanup, rc);
 
+        /* we don't set next id manually, instead OSCs will set them
+         * during own recovery from DELORPHAN reply -bzzz */
+#if 0
         vals[0] = index;
         rc = mds_dt_set_info(obd->obd_self_export, strlen("next_id"),
                              "next_id", 2, vals);
         if (rc)
                 GOTO(cleanup, rc);
         vals[0] = index;
         rc = mds_dt_set_info(obd->obd_self_export, strlen("next_id"),
                              "next_id", 2, vals);
         if (rc)
                 GOTO(cleanup, rc);
+#endif
 
         obd_llog_finish(obd, &obd->obd_llogs, old_count);
         obd_llog_cat_initialize(obd, &obd->obd_llogs, count, name);
 
         obd_llog_finish(obd, &obd->obd_llogs, old_count);
         obd_llog_cat_initialize(obd, &obd->obd_llogs, count, name);
index a23df76..e76ff94 100644 (file)
@@ -297,14 +297,6 @@ cleanup_dentry:
         return ERR_PTR(rc);
 }
 
         return ERR_PTR(rc);
 }
 
-/* this is object id allocation callback */
-static int mds_obj_alloc(obd_id *objid)
-{
-        ENTRY;
-        LASSERT(objid != NULL);
-        RETURN(++(*objid));
-}
-
 static inline void
 mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
                     struct lov_desc *desc)
 static inline void
 mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
                     struct lov_desc *desc)
@@ -319,10 +311,10 @@ mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
 
 /* must be called with i_sem held */
 int
 
 /* must be called with i_sem held */
 int
-mds_create_object(struct obd_device *obd, struct ptlrpc_request *req,
-                  int offset, struct mds_update_record *rec,
-                  struct dentry *dchild, void **handle,
-                  obd_id *ids)
+mds_create_objects(struct obd_device *obd, struct ptlrpc_request *req,
+                   int offset, struct mds_update_record *rec,
+                   struct dentry *dchild, void **handle,
+                   obd_id **ids)
 {
         struct inode *inode = dchild->d_inode;
         struct mds_obd *mds = &obd->u.mds;
 {
         struct inode *inode = dchild->d_inode;
         struct mds_obd *mds = &obd->u.mds;
@@ -346,7 +338,10 @@ mds_create_object(struct obd_device *obd, struct ptlrpc_request *req,
         if (body->valid & OBD_MD_FLEASIZE)
                 RETURN(0);
 
         if (body->valid & OBD_MD_FLEASIZE)
                 RETURN(0);
 
-        oti.oti_objid = ids;
+        OBD_ALLOC(*ids, mds->mds_dt_desc.ld_tgt_count * sizeof(**ids));
+        if (*ids == NULL)
+                RETURN(-ENOMEM);
+        oti.oti_objid = *ids;
                 
         /* replay case */
         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
                 
         /* replay case */
         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
@@ -364,15 +359,10 @@ mds_create_object(struct obd_device *obd, struct ptlrpc_request *req,
                         RETURN(rc);
                 }
 
                         RETURN(rc);
                 }
 
-                /* 
-                 * FIXME: this is evil layering violation, all things related to
-                 * stripping should be done by LOV.  --umka.
-                 */
-                mds_objids_from_lmm(ids, lmm, &mds->mds_dt_desc);
+                mds_objids_from_lmm(*ids, lmm, &mds->mds_dt_desc);
 
                 lmm_buf = lustre_msg_buf(req->rq_repmsg, offset, 0);
                 lmm_bufsize = req->rq_repmsg->buflens[offset];
 
                 lmm_buf = lustre_msg_buf(req->rq_repmsg, offset, 0);
                 lmm_bufsize = req->rq_repmsg->buflens[offset];
-                
                 LASSERT(lmm_buf != NULL);
                 LASSERT(lmm_bufsize >= lmm_size);
 
                 LASSERT(lmm_buf != NULL);
                 LASSERT(lmm_bufsize >= lmm_size);
 
@@ -428,14 +418,8 @@ mds_create_object(struct obd_device *obd, struct ptlrpc_request *req,
                                 GOTO(out_oa, rc);
                 }
 
                                 GOTO(out_oa, rc);
                 }
 
-                /* 
-                 * create with CROW flag and base ids for allocating new ids on
-                 * them.
-                 */
-                oti.oti_flags |= OBD_MODE_CROW;
-                oti.oti_obj_alloc = mds_obj_alloc;
-
                 LASSERT(oa->o_gr >= FILTER_GROUP_FIRST_MDS);
                 LASSERT(oa->o_gr >= FILTER_GROUP_FIRST_MDS);
+                oti.oti_flags |= OBD_MODE_CROW;
                 rc = obd_create(mds->mds_dt_exp, oa, NULL, 0, &lsm, &oti);
 
                 if (rc) {
                 rc = obd_create(mds->mds_dt_exp, oa, NULL, 0, &lsm, &oti);
 
                 if (rc) {
@@ -741,7 +725,6 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild,
         }
 
         if (rec != NULL) {
         }
 
         if (rec != NULL) {
-                /* no EA: create objects */
                 if ((body->valid & OBD_MD_FLEASIZE) &&
                     (rec->ur_flags & MDS_OPEN_HAS_EA)) {
                         up(&dchild->d_inode->i_sem);
                 if ((body->valid & OBD_MD_FLEASIZE) &&
                     (rec->ur_flags & MDS_OPEN_HAS_EA)) {
                         up(&dchild->d_inode->i_sem);
@@ -749,45 +732,14 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild,
                 }
                 
                 if (!(body->valid & OBD_MD_FLEASIZE)) {
                 }
                 
                 if (!(body->valid & OBD_MD_FLEASIZE)) {
-                        int ids_size = mds->mds_dt_desc.ld_tgt_count * sizeof(*ids);
-                        
-                        OBD_ALLOC(ids, ids_size);
-                        if (ids == NULL) {
-                                up(&dchild->d_inode->i_sem);
-                                RETURN(-ENOMEM);
-                        }
-
-                        /* 
-                         * synchronizing object creating to prevent another
-                         * threads take the same base objid values.
-                         */
-                        down(&mds->mds_create_sem);
-
-                        /* preparing base ids */
-                        mds_dt_save_objids(obd, ids);
-
-                        /* 
-                         * create objects, @ids will contain new allocated obj
-                         * ids.
-                         */
-                        rc = mds_create_object(obd, req, 2, rec,
-                                               dchild, handle, ids);
+                        /* no EA: create objects */
+                        rc = mds_create_objects(obd, req, 2, rec,
+                                                dchild, handle, &ids);
                         if (rc) {
                                 CERROR("mds_create_object: rc = %d\n", rc);
                         if (rc) {
                                 CERROR("mds_create_object: rc = %d\n", rc);
-                                up(&mds->mds_create_sem);
                                 up(&dchild->d_inode->i_sem);
                                 up(&dchild->d_inode->i_sem);
-                                OBD_FREE(ids, ids_size);
                                 RETURN(rc);
                         }
                                 RETURN(rc);
                         }
-
-                        /*
-                         * update MDS objids by new ones allocated in
-                         * mds_create_object().
-                         */
-                        mds_dt_update_objids(obd, ids);
-                        OBD_FREE(ids, ids_size);
-                        
-                        up(&mds->mds_create_sem);
                 }
                 
                 if (S_ISREG(dchild->d_inode->i_mode) &&
                 }
                 
                 if (S_ISREG(dchild->d_inode->i_mode) &&
@@ -827,6 +779,12 @@ static int mds_finish_open(struct ptlrpc_request *req, struct dentry *dchild,
 
         CDEBUG(D_INODE, "mfd %p, cookie "LPX64"\n", mfd,
                mfd->mfd_handle.h_cookie);
 
         CDEBUG(D_INODE, "mfd %p, cookie "LPX64"\n", mfd,
                mfd->mfd_handle.h_cookie);
+
+        if (ids != NULL) {
+                mds_dt_update_objids(obd, ids);
+                OBD_FREE(ids, sizeof(*ids) * mds->mds_dt_desc.ld_tgt_count);
+        }
+
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
index c6ce804..80a55f7 100644 (file)
@@ -2509,6 +2509,9 @@ filter_clear_orphans(struct obd_export *exp, struct obdo *oa)
         CDEBUG(D_HA, "%s:["LPU64"] after destroy: set last_objids = "
                LPU64"\n", exp->exp_obd->obd_name, doa->o_gr, oa->o_id);
 
         CDEBUG(D_HA, "%s:["LPU64"] after destroy: set last_objids = "
                LPU64"\n", exp->exp_obd->obd_name, doa->o_gr, oa->o_id);
 
+        /* return next free id to be used as a new start of sequence -bzzz */
+        oa->o_id = last + 1;
+
         filter_set_last_id(filter, oa->o_gr, oa->o_id);
         clear_bit(doa->o_gr, &filter->fo_destroys_in_progress);
         up(&filter->fo_create_locks[oa->o_gr]);
         filter_set_last_id(filter, oa->o_gr, oa->o_id);
         clear_bit(doa->o_gr, &filter->fo_destroys_in_progress);
         up(&filter->fo_create_locks[oa->o_gr]);
index 722cfc7..3a25414 100644 (file)
 #include <linux/obd_class.h>
 #include "osc_internal.h"
 
 #include <linux/obd_class.h>
 #include "osc_internal.h"
 
+int oscc_recovering(struct osc_creator *oscc)
+{
+        int recov = 0;
+
+        spin_lock(&oscc->oscc_lock);
+        recov = oscc->oscc_flags & OSCC_FLAG_RECOVERING;
+        spin_unlock(&oscc->oscc_lock);
+
+        return recov;
+}
+
 /* this only is used now for deleting orphanes */
 int osc_create(struct obd_export *exp, struct obdo *oa,
                void *acl, int acl_size, struct lov_stripe_md **ea,
                struct obd_trans_info *oti)
 {
         struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;
 /* this only is used now for deleting orphanes */
 int osc_create(struct obd_export *exp, struct obdo *oa,
                void *acl, int acl_size, struct lov_stripe_md **ea,
                struct obd_trans_info *oti)
 {
         struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;
-        int rc = 0;
+        int rc = 0, try_again = 1;
         ENTRY;
 
         LASSERT(oa);
         ENTRY;
 
         LASSERT(oa);
@@ -117,8 +128,51 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                 RETURN(rc);
         }
 
                 RETURN(rc);
         }
 
-        LBUG();
-        RETURN(0);
+        while (try_again) {
+                /* If orphans are being recovered, then we must wait until
+                   it is finished before we can continue with create. */
+                if (oscc_recovering(oscc)) {
+                        struct l_wait_info lwi;
+
+                        CDEBUG(D_HA,"%p: oscc recovery in progress, waiting\n",
+                               oscc);
+
+                        lwi = LWI_TIMEOUT(MAX(obd_timeout*HZ/4, 1), NULL, NULL);
+                        rc = l_wait_event(oscc->oscc_waitq,
+                                          !oscc_recovering(oscc), &lwi);
+                        LASSERT(rc == 0 || rc == -ETIMEDOUT);
+                        if (rc == -ETIMEDOUT) {
+                                CDEBUG(D_HA,"%p: timeout waiting on recovery\n",
+                                       oscc);
+                                RETURN(rc);
+                        }
+                        CDEBUG(D_HA, "%p: oscc recovery over, waking up\n",
+                               oscc);
+                }
+
+                spin_lock(&oscc->oscc_lock);
+                if (oscc->oscc_flags & OSCC_FLAG_EXITING) {
+                        spin_unlock(&oscc->oscc_lock);
+                        break;
+                }
+
+                if (oscc->oscc_flags & OSCC_FLAG_NOSPC) {
+                        rc = -ENOSPC;
+                        spin_unlock(&oscc->oscc_lock);
+                        break;
+                }
+
+                oscc->oscc_next_id++;
+                oa->o_id = oscc->oscc_next_id;
+                try_again = 0;
+                spin_unlock(&oscc->oscc_lock);
+
+                CDEBUG(D_HA, "%s: returning objid "LPU64"\n",
+                       oscc->oscc_obd->u.cli.cl_import->imp_target_uuid.uuid,
+                       oa->o_id);
+        }
+
+        RETURN(rc);
 }
 
 void oscc_init(struct obd_device *obd)
 }
 
 void oscc_init(struct obd_device *obd)
@@ -134,4 +188,5 @@ void oscc_init(struct obd_device *obd)
         oscc->oscc_obd = obd;
         spin_lock_init(&oscc->oscc_lock);
         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
         oscc->oscc_obd = obd;
         spin_lock_init(&oscc->oscc_lock);
         oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
+        init_waitqueue_head(&oscc->oscc_waitq);
 }
 }
index 12a42e5..2d565f6 100644 (file)
@@ -292,6 +292,7 @@ out:
 int osc_real_create(struct obd_export *exp, struct obdo *oa,
                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
 {
 int osc_real_create(struct obd_export *exp, struct obdo *oa,
                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
 {
+        struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;
         struct ptlrpc_request *request;
         struct ost_body *body;
         struct lov_stripe_md *lsm;
         struct ptlrpc_request *request;
         struct ost_body *body;
         struct lov_stripe_md *lsm;
@@ -337,6 +338,16 @@ int osc_real_create(struct obd_export *exp, struct obdo *oa,
                 GOTO (out_req, rc = -EPROTO);
         }
 
                 GOTO (out_req, rc = -EPROTO);
         }
 
+        if ((oa->o_valid & OBD_MD_FLFLAGS) && oa->o_flags == OBD_FL_DELORPHAN) {
+                struct obd_import *imp = class_exp2cliimp(exp);
+                /* MDS declares last known object, OSS responses
+                 * with next possible object -bzzz */
+                spin_lock(&oscc->oscc_lock);
+                oscc->oscc_next_id = body->oa.o_id;
+                spin_unlock(&oscc->oscc_lock);
+                CDEBUG(D_HA, "%s: set nextid "LPD64" after recovery\n",
+                       imp->imp_target_uuid.uuid, oa->o_id);
+        }
         memcpy(oa, &body->oa, sizeof(*oa));
 
         /* This should really be sent by the OST */
         memcpy(oa, &body->oa, sizeof(*oa));
 
         /* This should really be sent by the OST */