Whamcloud - gitweb
b=23074 improve autogen.sh
[fs/lustre-release.git] / lustre / osc / osc_create.c
index 4cee7c6..584a730 100644 (file)
@@ -26,7 +26,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  */
 /*
@@ -95,7 +95,7 @@ static int osc_interpret_create(const struct lu_env *env,
         switch (rc) {
         case 0: {
                 if (body) {
-                        int diff = body->oa.o_id - oscc->oscc_last_id;
+                        int diff =ostid_id(&body->oa.o_oi)- oscc->oscc_last_id;
 
                         /* oscc_internal_create() stores the original value of
                          * grow_count in rq_async_args.space[0].
@@ -117,7 +117,7 @@ static int osc_interpret_create(const struct lu_env *env,
                                  * next time if needed */
                                 oscc->oscc_flags &= ~OSCC_FLAG_LOW;
                         }
-                        oscc->oscc_last_id = body->oa.o_id;
+                        oscc->oscc_last_id = ostid_id(&body->oa.o_oi);
                 }
                 cfs_spin_unlock(&oscc->oscc_lock);
                 break;
@@ -196,8 +196,8 @@ static int oscc_internal_create(struct osc_creator *oscc)
 
         LASSERT_SPIN_LOCKED(&oscc->oscc_lock);
 
-        if ((oscc->oscc_flags & OSCC_FLAG_RECOVERING) ||
-            (oscc->oscc_flags & OSCC_FLAG_DEGRADED)) {
+        /* Do not check for a degraded OST here - bug21563/bug18539 */
+        if (oscc->oscc_flags & OSCC_FLAG_RECOVERING) {
                 cfs_spin_unlock(&oscc->oscc_lock);
                 RETURN(0);
         }
@@ -238,9 +238,18 @@ static int oscc_internal_create(struct osc_creator *oscc)
         body = req_capsule_client_get(&request->rq_pill, &RMF_OST_BODY);
 
         cfs_spin_lock(&oscc->oscc_lock);
-        body->oa.o_id = oscc->oscc_last_id + oscc->oscc_grow_count;
-        body->oa.o_gr = oscc->oscc_oa.o_gr;
-        LASSERT_MDS_GROUP(body->oa.o_gr);
+
+        if (likely(fid_seq_is_mdt(oscc->oscc_oa.o_seq))) {
+                body->oa.o_oi.oi_seq = oscc->oscc_oa.o_seq;
+                body->oa.o_oi.oi_id  = oscc->oscc_last_id +
+                                       oscc->oscc_grow_count;
+        } else {
+                /*Just warning here currently, since not sure how fid-on-ost
+                 *will be implemented here */
+                CWARN("o_seq: "LPU64" is not indicate any MDTs.\n",
+                       oscc->oscc_oa.o_seq);
+        }
+
         body->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
         request->rq_async_args.space[0] = oscc->oscc_grow_count;
         cfs_spin_unlock(&oscc->oscc_lock);
@@ -327,8 +336,9 @@ static int oscc_in_sync(struct osc_creator *oscc)
 }
 
 /* decide if the OST has remaining object, return value :
-        0 : the OST has remaining object, and don't need to do precreate.
-        1 : the OST has no remaining object, and will send a RPC for precreate.
+        0 : the OST has remaining objects, may or may not send precreation RPC.
+        1 : the OST has no remaining object, and the sent precreation RPC
+            has not been completed yet.
         2 : the OST has no remaining object, and will not get any for
             a potentially very long time
      1000 : unusable
@@ -337,41 +347,44 @@ int osc_precreate(struct obd_export *exp)
 {
         struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;
         struct obd_import *imp = exp->exp_imp_reverse;
+        int rc;
         ENTRY;
 
         LASSERT(oscc != NULL);
         if (imp != NULL && imp->imp_deactive)
-                RETURN(1000);
+                GOTO(out_nolock, rc = 1000);
 
         /* Handle critical states first */
         cfs_spin_lock(&oscc->oscc_lock);
         if (oscc->oscc_flags & OSCC_FLAG_NOSPC ||
             oscc->oscc_flags & OSCC_FLAG_RDONLY ||
-            oscc->oscc_flags & OSCC_FLAG_EXITING) {
-                cfs_spin_unlock(&oscc->oscc_lock);
-                RETURN(1000);
-        }
+            oscc->oscc_flags & OSCC_FLAG_EXITING)
+                GOTO(out, rc = 1000);
 
-        if (oscc->oscc_flags & OSCC_FLAG_RECOVERING ||
-            oscc->oscc_flags & OSCC_FLAG_DEGRADED) {
-                cfs_spin_unlock(&oscc->oscc_lock);
-                RETURN(2);
-        }
+        if ((oscc->oscc_flags & OSCC_FLAG_RECOVERING) ||
+            (oscc->oscc_flags & OSCC_FLAG_DEGRADED))
+                GOTO(out, rc = 2);
 
-        if (oscc_has_objects_nolock(oscc, oscc->oscc_grow_count / 2)) {
-                cfs_spin_unlock(&oscc->oscc_lock);
-                RETURN(0);
-        }
+        if (oscc_has_objects_nolock(oscc, oscc->oscc_grow_count / 2))
+                GOTO(out, rc = 0);
 
-        if ((oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS) ||
-            (oscc->oscc_flags & OSCC_FLAG_CREATING)) {
-                cfs_spin_unlock(&oscc->oscc_lock);
-                RETURN(1);
-        }
+        /* Return 0, if we have at least one object - bug 22884 */
+        rc = oscc_has_objects_nolock(oscc, 1) ? 0 : 1;
+
+        /* Do not check for OSCC_FLAG_CREATING flag here, let
+         * osc_precreate() call oscc_internal_create() and
+         * adjust oscc_grow_count bug21563 */
+        if (oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS)
+                GOTO(out, rc);
 
         if (oscc_internal_create(oscc))
-                RETURN(1000);
-        RETURN(1);
+                GOTO(out_nolock, rc = 1000);
+
+        RETURN(rc);
+out:
+        cfs_spin_unlock(&oscc->oscc_lock);
+out_nolock:
+        return rc;
 }
 
 static int handle_async_create(struct ptlrpc_request *req, int rc)
@@ -450,7 +463,7 @@ int osc_create_async(struct obd_export *exp, struct obd_info *oinfo,
         struct obdo *oa = oinfo->oi_oa;
         ENTRY;
 
-        if ((oa->o_valid & OBD_MD_FLGROUP) && !filter_group_is_mds(oa->o_gr)) {
+        if ((oa->o_valid & OBD_MD_FLGROUP) && !fid_seq_is_mdt(oa->o_seq)) {
                 rc = osc_real_create(exp, oinfo->oi_oa, ea, oti);
                 rc = oinfo->oi_cb_up(oinfo, rc);
                 RETURN(rc);
@@ -524,7 +537,7 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                 RETURN(osc_real_create(exp, oa, ea, oti));
         }
 
-        if (!filter_group_is_mds(oa->o_gr))
+        if (!fid_seq_is_mdt(oa->o_seq))
                 RETURN(osc_real_create(exp, oa, ea, oti));
 
         /* this is the special case where create removes orphans */
@@ -568,6 +581,11 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                         oscc->oscc_last_id = oa->o_id;
                         ocd = &imp->imp_connect_data;
                         if (ocd->ocd_connect_flags & OBD_CONNECT_SKIP_ORPHAN) {
+                                /*
+                                 * The OST reports back in oa->o_id from where
+                                 * we should restart in order to skip orphan
+                                 * objects
+                                 */
                                 CDEBUG(D_HA, "%s: Skip orphan set, reset last "
                                        "objid\n", oscc->oscc_obd->obd_name);
                                 oscc->oscc_next_id = oa->o_id + 1;
@@ -587,7 +605,8 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                 cfs_waitq_signal(&oscc->oscc_waitq);
                 cfs_spin_unlock(&oscc->oscc_lock);
 
-                RETURN(rc);
+                if (rc < 0)
+                        RETURN(rc);
         }
 
         lsm = *ea;
@@ -635,6 +654,18 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                         break;
                 }
 
+                /**
+                 * If this is DELORPHAN process, no need create object here,
+                 * otherwise this will create a gap of object id, and MDS
+                 * might create some orphan log (mds_lov_update_objids), then
+                 * remove objects wrongly on OST. Bug 21379.
+                 */
+                if (oa->o_valid & OBD_MD_FLFLAGS &&
+                        oa->o_flags == OBD_FL_DELORPHAN) {
+                        cfs_spin_unlock(&oscc->oscc_lock);
+                        break;
+                }
+
                 if (oscc_has_objects_nolock(oscc, 1)) {
                         memcpy(oa, &oscc->oscc_oa, sizeof(*oa));
                         oa->o_id = oscc->oscc_next_id;