Whamcloud - gitweb
b=23074 improve autogen.sh
[fs/lustre-release.git] / lustre / osc / osc_create.c
index 17fcbd8..584a730 100644 (file)
@@ -26,7 +26,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  */
 /*
@@ -90,12 +90,12 @@ static int osc_interpret_create(const struct lu_env *env,
         oscc = req->rq_async_args.pointer_arg[0];
         LASSERT(oscc && (oscc->oscc_obd != LP_POISON));
 
-        spin_lock(&oscc->oscc_lock);
+        cfs_spin_lock(&oscc->oscc_lock);
         oscc->oscc_flags &= ~OSCC_FLAG_CREATING;
         switch (rc) {
         case 0: {
                 if (body) {
-                        int diff = body->oa.o_id - oscc->oscc_last_id;
+                        int diff =ostid_id(&body->oa.o_oi)- oscc->oscc_last_id;
 
                         /* oscc_internal_create() stores the original value of
                          * grow_count in rq_async_args.space[0].
@@ -117,9 +117,9 @@ static int osc_interpret_create(const struct lu_env *env,
                                  * next time if needed */
                                 oscc->oscc_flags &= ~OSCC_FLAG_LOW;
                         }
-                        oscc->oscc_last_id = body->oa.o_id;
+                        oscc->oscc_last_id = ostid_id(&body->oa.o_oi);
                 }
-                spin_unlock(&oscc->oscc_lock);
+                cfs_spin_unlock(&oscc->oscc_lock);
                 break;
         }
         case -EROFS:
@@ -133,7 +133,7 @@ static int osc_interpret_create(const struct lu_env *env,
                                 oscc->oscc_grow_count = OST_MIN_PRECREATE;
                         }
                 }
-                spin_unlock(&oscc->oscc_lock);
+                cfs_spin_unlock(&oscc->oscc_lock);
                 DEBUG_REQ(D_INODE, req, "OST out of space, flagging");
                 break;
         case -EIO: {
@@ -141,22 +141,26 @@ static int osc_interpret_create(const struct lu_env *env,
                  * of filter (see filter_handle_precreate for detail)*/
                 if (body && body->oa.o_id > oscc->oscc_last_id)
                         oscc->oscc_last_id = body->oa.o_id;
-                spin_unlock(&oscc->oscc_lock);
+                cfs_spin_unlock(&oscc->oscc_lock);
                 break;
         }
+        case -EINTR:
         case -EWOULDBLOCK: {
                 /* aka EAGAIN we should not delay create if import failed -
                  * this avoid client stick in create and avoid race with
                  * delorphan */
+                /* EINTR say - old create request is killed due mds<>ost
+                 * eviction - OSCC_FLAG_RECOVERING can already set due
+                 * IMP_DISCONN event */
                 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
                 /* oscc->oscc_grow_count = OST_MIN_PRECREATE; */
-                spin_unlock(&oscc->oscc_lock);
+                cfs_spin_unlock(&oscc->oscc_lock);
                 break;
         }
         default: {
                 oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
                 oscc->oscc_grow_count = OST_MIN_PRECREATE;
-                spin_unlock(&oscc->oscc_lock);
+                cfs_spin_unlock(&oscc->oscc_lock);
                 DEBUG_REQ(D_ERROR, req,
                           "Unknown rc %d from async create: failing oscc", rc);
                 ptlrpc_fail_import(req->rq_import,
@@ -167,9 +171,9 @@ static int osc_interpret_create(const struct lu_env *env,
         CDEBUG(D_HA, "preallocated through id "LPU64" (next to use "LPU64")\n",
                oscc->oscc_last_id, oscc->oscc_next_id);
 
-        spin_lock(&oscc->oscc_lock);
-        list_for_each_entry_safe(fake_req, pos,
-                                 &oscc->oscc_wait_create_list, rq_list) {
+        cfs_spin_lock(&oscc->oscc_lock);
+        cfs_list_for_each_entry_safe(fake_req, pos,
+                                     &oscc->oscc_wait_create_list, rq_list) {
                 if (handle_async_create(fake_req, rc)  == -EAGAIN) {
                         oscc_internal_create(oscc);
                         /* sending request should be never fail because
@@ -177,7 +181,7 @@ static int osc_interpret_create(const struct lu_env *env,
                         GOTO(exit_wakeup, rc);
                 }
         }
-        spin_unlock(&oscc->oscc_lock);
+        cfs_spin_unlock(&oscc->oscc_lock);
 
 exit_wakeup:
         cfs_waitq_signal(&oscc->oscc_waitq);
@@ -192,9 +196,9 @@ static int oscc_internal_create(struct osc_creator *oscc)
 
         LASSERT_SPIN_LOCKED(&oscc->oscc_lock);
 
-        if ((oscc->oscc_flags & OSCC_FLAG_RECOVERING) ||
-            (oscc->oscc_flags & OSCC_FLAG_DEGRADED)) {
-                spin_unlock(&oscc->oscc_lock);
+        /* Do not check for a degraded OST here - bug21563/bug18539 */
+        if (oscc->oscc_flags & OSCC_FLAG_RECOVERING) {
+                cfs_spin_unlock(&oscc->oscc_lock);
                 RETURN(0);
         }
 
@@ -209,7 +213,7 @@ static int oscc_internal_create(struct osc_creator *oscc)
         }
 
         if (oscc->oscc_flags & OSCC_FLAG_CREATING) {
-                spin_unlock(&oscc->oscc_lock);
+                cfs_spin_unlock(&oscc->oscc_lock);
                 RETURN(0);
         }
 
@@ -217,15 +221,15 @@ static int oscc_internal_create(struct osc_creator *oscc)
                 oscc->oscc_grow_count = oscc->oscc_max_grow_count / 2;
 
         oscc->oscc_flags |= OSCC_FLAG_CREATING;
-        spin_unlock(&oscc->oscc_lock);
+        cfs_spin_unlock(&oscc->oscc_lock);
 
         request = ptlrpc_request_alloc_pack(oscc->oscc_obd->u.cli.cl_import,
                                             &RQF_OST_CREATE,
                                             LUSTRE_OST_VERSION, OST_CREATE);
         if (request == NULL) {
-                spin_lock(&oscc->oscc_lock);
+                cfs_spin_lock(&oscc->oscc_lock);
                 oscc->oscc_flags &= ~OSCC_FLAG_CREATING;
-                spin_unlock(&oscc->oscc_lock);
+                cfs_spin_unlock(&oscc->oscc_lock);
                 RETURN(-ENOMEM);
         }
 
@@ -233,13 +237,22 @@ static int oscc_internal_create(struct osc_creator *oscc)
         ptlrpc_at_set_req_timeout(request);
         body = req_capsule_client_get(&request->rq_pill, &RMF_OST_BODY);
 
-        spin_lock(&oscc->oscc_lock);
-        body->oa.o_id = oscc->oscc_last_id + oscc->oscc_grow_count;
-        body->oa.o_gr = oscc->oscc_oa.o_gr;
-        LASSERT_MDS_GROUP(body->oa.o_gr);
+        cfs_spin_lock(&oscc->oscc_lock);
+
+        if (likely(fid_seq_is_mdt(oscc->oscc_oa.o_seq))) {
+                body->oa.o_oi.oi_seq = oscc->oscc_oa.o_seq;
+                body->oa.o_oi.oi_id  = oscc->oscc_last_id +
+                                       oscc->oscc_grow_count;
+        } else {
+                /*Just warning here currently, since not sure how fid-on-ost
+                 *will be implemented here */
+                CWARN("o_seq: "LPU64" is not indicate any MDTs.\n",
+                       oscc->oscc_oa.o_seq);
+        }
+
         body->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
         request->rq_async_args.space[0] = oscc->oscc_grow_count;
-        spin_unlock(&oscc->oscc_lock);
+        cfs_spin_unlock(&oscc->oscc_lock);
         CDEBUG(D_RPCTRACE, "prealloc through id "LPU64" (last seen "LPU64")\n",
                body->oa.o_id, oscc->oscc_last_id);
 
@@ -265,9 +278,9 @@ static int oscc_has_objects(struct osc_creator *oscc, int count)
 {
         int have_objs;
 
-        spin_lock(&oscc->oscc_lock);
+        cfs_spin_lock(&oscc->oscc_lock);
         have_objs = oscc_has_objects_nolock(oscc, count);
-        spin_unlock(&oscc->oscc_lock);
+        cfs_spin_unlock(&oscc->oscc_lock);
 
         return have_objs;
 }
@@ -279,7 +292,7 @@ static int oscc_wait_for_objects(struct osc_creator *oscc, int count)
 
         ost_unusable = oscc->oscc_obd->u.cli.cl_import->imp_invalid;
 
-        spin_lock(&oscc->oscc_lock);
+        cfs_spin_lock(&oscc->oscc_lock);
         ost_unusable |= (OSCC_FLAG_NOSPC | OSCC_FLAG_RDONLY |
                          OSCC_FLAG_EXITING) & oscc->oscc_flags;
         have_objs = oscc_has_objects_nolock(oscc, count);
@@ -288,7 +301,7 @@ static int oscc_wait_for_objects(struct osc_creator *oscc, int count)
                 /* they release lock himself */
                 have_objs = oscc_internal_create(oscc);
         else
-                spin_unlock(&oscc->oscc_lock);
+                cfs_spin_unlock(&oscc->oscc_lock);
 
         return have_objs || ost_unusable;
 }
@@ -315,16 +328,17 @@ static int oscc_in_sync(struct osc_creator *oscc)
 {
         int sync;
 
-        spin_lock(&oscc->oscc_lock);
+        cfs_spin_lock(&oscc->oscc_lock);
         sync = oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS;
-        spin_unlock(&oscc->oscc_lock);
+        cfs_spin_unlock(&oscc->oscc_lock);
 
         return sync;
 }
 
 /* decide if the OST has remaining object, return value :
-        0 : the OST has remaining object, and don't need to do precreate.
-        1 : the OST has no remaining object, and will send a RPC for precreate.
+        0 : the OST has remaining objects, may or may not send precreation RPC.
+        1 : the OST has no remaining object, and the sent precreation RPC
+            has not been completed yet.
         2 : the OST has no remaining object, and will not get any for
             a potentially very long time
      1000 : unusable
@@ -333,41 +347,44 @@ int osc_precreate(struct obd_export *exp)
 {
         struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;
         struct obd_import *imp = exp->exp_imp_reverse;
+        int rc;
         ENTRY;
 
         LASSERT(oscc != NULL);
         if (imp != NULL && imp->imp_deactive)
-                RETURN(1000);
+                GOTO(out_nolock, rc = 1000);
 
         /* Handle critical states first */
-        spin_lock(&oscc->oscc_lock);
+        cfs_spin_lock(&oscc->oscc_lock);
         if (oscc->oscc_flags & OSCC_FLAG_NOSPC ||
             oscc->oscc_flags & OSCC_FLAG_RDONLY ||
-            oscc->oscc_flags & OSCC_FLAG_EXITING) {
-                spin_unlock(&oscc->oscc_lock);
-                RETURN(1000);
-        }
+            oscc->oscc_flags & OSCC_FLAG_EXITING)
+                GOTO(out, rc = 1000);
 
-        if (oscc->oscc_flags & OSCC_FLAG_RECOVERING ||
-            oscc->oscc_flags & OSCC_FLAG_DEGRADED) {
-                spin_unlock(&oscc->oscc_lock);
-                RETURN(2);
-        }
+        if ((oscc->oscc_flags & OSCC_FLAG_RECOVERING) ||
+            (oscc->oscc_flags & OSCC_FLAG_DEGRADED))
+                GOTO(out, rc = 2);
 
-        if (oscc_has_objects_nolock(oscc, oscc->oscc_grow_count / 2)) {
-                spin_unlock(&oscc->oscc_lock);
-                RETURN(0);
-        }
+        if (oscc_has_objects_nolock(oscc, oscc->oscc_grow_count / 2))
+                GOTO(out, rc = 0);
 
-        if ((oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS) ||
-            (oscc->oscc_flags & OSCC_FLAG_CREATING)) {
-                spin_unlock(&oscc->oscc_lock);
-                RETURN(1);
-        }
+        /* Return 0, if we have at least one object - bug 22884 */
+        rc = oscc_has_objects_nolock(oscc, 1) ? 0 : 1;
+
+        /* Do not check for OSCC_FLAG_CREATING flag here, let
+         * osc_precreate() call oscc_internal_create() and
+         * adjust oscc_grow_count bug21563 */
+        if (oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS)
+                GOTO(out, rc);
 
         if (oscc_internal_create(oscc))
-                RETURN(1000);
-        RETURN(1);
+                GOTO(out_nolock, rc = 1000);
+
+        RETURN(rc);
+out:
+        cfs_spin_unlock(&oscc->oscc_lock);
+out_nolock:
+        return rc;
 }
 
 static int handle_async_create(struct ptlrpc_request *req, int rc)
@@ -422,15 +439,16 @@ out_wake:
 }
 
 static int async_create_interpret(const struct lu_env *env,
-                                  struct ptlrpc_request *req, void *data, int rc)
+                                  struct ptlrpc_request *req, void *data,
+                                  int rc)
 {
         struct osc_create_async_args *args = ptlrpc_req_async_args(req);
         struct osc_creator    *oscc = args->rq_oscc;
         int ret;
 
-        spin_lock(&oscc->oscc_lock);
+        cfs_spin_lock(&oscc->oscc_lock);
         ret = handle_async_create(req, rc);
-        spin_unlock(&oscc->oscc_lock);
+        cfs_spin_unlock(&oscc->oscc_lock);
 
         return ret;
 }
@@ -445,7 +463,7 @@ int osc_create_async(struct obd_export *exp, struct obd_info *oinfo,
         struct obdo *oa = oinfo->oi_oa;
         ENTRY;
 
-        if ((oa->o_valid & OBD_MD_FLGROUP) && !filter_group_is_mds(oa->o_gr)) {
+        if ((oa->o_valid & OBD_MD_FLGROUP) && !fid_seq_is_mdt(oa->o_seq)) {
                 rc = osc_real_create(exp, oinfo->oi_oa, ea, oti);
                 rc = oinfo->oi_cb_up(oinfo, rc);
                 RETURN(rc);
@@ -475,7 +493,7 @@ int osc_create_async(struct obd_export *exp, struct obd_info *oinfo,
         args->rq_lsm   = *ea;
         args->rq_oinfo = oinfo;
 
-        spin_lock(&oscc->oscc_lock);
+        cfs_spin_lock(&oscc->oscc_lock);
         /* try fast path */
         rc = handle_async_create(fake_req, 0);
         if (rc == -EAGAIN) {
@@ -483,12 +501,12 @@ int osc_create_async(struct obd_export *exp, struct obd_info *oinfo,
                 /* we not have objects - try wait */
                 is_add = ptlrpcd_add_req(fake_req, PSCOPE_OTHER);
                 if (!is_add)
-                        list_add(&fake_req->rq_list,
-                                 &oscc->oscc_wait_create_list);
+                        cfs_list_add(&fake_req->rq_list,
+                                     &oscc->oscc_wait_create_list);
                 else
                         rc = is_add;
         }
-        spin_unlock(&oscc->oscc_lock);
+        cfs_spin_unlock(&oscc->oscc_lock);
 
         if (rc != -EAGAIN)
                 /* need free request if was error hit or
@@ -519,19 +537,19 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                 RETURN(osc_real_create(exp, oa, ea, oti));
         }
 
-        if (!filter_group_is_mds(oa->o_gr))
+        if (!fid_seq_is_mdt(oa->o_seq))
                 RETURN(osc_real_create(exp, oa, ea, oti));
 
         /* this is the special case where create removes orphans */
         if (oa->o_valid & OBD_MD_FLFLAGS &&
             oa->o_flags == OBD_FL_DELORPHAN) {
-                spin_lock(&oscc->oscc_lock);
+                cfs_spin_lock(&oscc->oscc_lock);
                 if (oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS) {
-                        spin_unlock(&oscc->oscc_lock);
+                        cfs_spin_unlock(&oscc->oscc_lock);
                         RETURN(-EBUSY);
                 }
                 if (!(oscc->oscc_flags & OSCC_FLAG_RECOVERING)) {
-                        spin_unlock(&oscc->oscc_lock);
+                        cfs_spin_unlock(&oscc->oscc_lock);
                         RETURN(0);
                 }
 
@@ -539,7 +557,7 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                 /* seting flag LOW we prevent extra grow precreate size
                  * and enforce use last assigned size */
                 oscc->oscc_flags |= OSCC_FLAG_LOW;
-                spin_unlock(&oscc->oscc_lock);
+                cfs_spin_unlock(&oscc->oscc_lock);
                 CDEBUG(D_HA, "%s: oscc recovery started - delete to "LPU64"\n",
                        oscc->oscc_obd->obd_name, oscc->oscc_next_id - 1);
 
@@ -551,7 +569,7 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
 
                 rc = osc_real_create(exp, oa, ea, NULL);
 
-                spin_lock(&oscc->oscc_lock);
+                cfs_spin_lock(&oscc->oscc_lock);
                 oscc->oscc_flags &= ~OSCC_FLAG_SYNC_IN_PROGRESS;
                 if (rc == 0 || rc == -ENOSPC) {
                         struct obd_connect_data *ocd;
@@ -563,6 +581,11 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                         oscc->oscc_last_id = oa->o_id;
                         ocd = &imp->imp_connect_data;
                         if (ocd->ocd_connect_flags & OBD_CONNECT_SKIP_ORPHAN) {
+                                /*
+                                 * The OST reports back in oa->o_id from where
+                                 * we should restart in order to skip orphan
+                                 * objects
+                                 */
                                 CDEBUG(D_HA, "%s: Skip orphan set, reset last "
                                        "objid\n", oscc->oscc_obd->obd_name);
                                 oscc->oscc_next_id = oa->o_id + 1;
@@ -580,7 +603,7 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                 }
 
                 cfs_waitq_signal(&oscc->oscc_waitq);
-                spin_unlock(&oscc->oscc_lock);
+                cfs_spin_unlock(&oscc->oscc_lock);
 
                 if (rc < 0)
                         RETURN(rc);
@@ -603,31 +626,43 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                         CDEBUG(D_HA,"%s: error create %d\n",
                                oscc->oscc_obd->obd_name, rc);
 
-                spin_lock(&oscc->oscc_lock);
+                cfs_spin_lock(&oscc->oscc_lock);
 
                 /* wakeup but recovery did not finished */
                 if ((oscc->oscc_obd->u.cli.cl_import->imp_invalid) ||
                     (oscc->oscc_flags & OSCC_FLAG_RECOVERING)) {
                         rc = -EIO;
-                        spin_unlock(&oscc->oscc_lock);
+                        cfs_spin_unlock(&oscc->oscc_lock);
                         break;
                 }
 
                 if (oscc->oscc_flags & OSCC_FLAG_NOSPC) {
                         rc = -ENOSPC;
-                        spin_unlock(&oscc->oscc_lock);
+                        cfs_spin_unlock(&oscc->oscc_lock);
                         break;
                 }
 
                 if (oscc->oscc_flags & OSCC_FLAG_RDONLY) {
                         rc = -EROFS;
-                        spin_unlock(&oscc->oscc_lock);
+                        cfs_spin_unlock(&oscc->oscc_lock);
                         break;
                 }
 
                 // Should we report -EIO error ?
                 if (oscc->oscc_flags & OSCC_FLAG_EXITING) {
-                        spin_unlock(&oscc->oscc_lock);
+                        cfs_spin_unlock(&oscc->oscc_lock);
+                        break;
+                }
+
+                /**
+                 * If this is DELORPHAN process, no need create object here,
+                 * otherwise this will create a gap of object id, and MDS
+                 * might create some orphan log (mds_lov_update_objids), then
+                 * remove objects wrongly on OST. Bug 21379.
+                 */
+                if (oa->o_valid & OBD_MD_FLFLAGS &&
+                        oa->o_flags == OBD_FL_DELORPHAN) {
+                        cfs_spin_unlock(&oscc->oscc_lock);
                         break;
                 }
 
@@ -637,14 +672,14 @@ int osc_create(struct obd_export *exp, struct obdo *oa,
                         lsm->lsm_object_id = oscc->oscc_next_id;
                         *ea = lsm;
                         oscc->oscc_next_id++;
-                        spin_unlock(&oscc->oscc_lock);
+                        cfs_spin_unlock(&oscc->oscc_lock);
 
                         CDEBUG(D_RPCTRACE, "%s: set oscc_next_id = "LPU64"\n",
                                exp->exp_obd->obd_name, oscc->oscc_next_id);
                         break;
                 }
 
-                spin_unlock(&oscc->oscc_lock);
+                cfs_spin_unlock(&oscc->oscc_lock);
         }
 
         if (rc == 0) {
@@ -672,7 +707,7 @@ void oscc_init(struct obd_device *obd)
         memset(oscc, 0, sizeof(*oscc));
 
         cfs_waitq_init(&oscc->oscc_waitq);
-        spin_lock_init(&oscc->oscc_lock);
+        cfs_spin_lock_init(&oscc->oscc_lock);
         oscc->oscc_obd = obd;
         oscc->oscc_grow_count = OST_MIN_PRECREATE;
         oscc->oscc_max_grow_count = OST_MAX_PRECREATE;
@@ -693,8 +728,8 @@ void oscc_fini(struct obd_device *obd)
         ENTRY;
 
 
-        spin_lock(&oscc->oscc_lock);
+        cfs_spin_lock(&oscc->oscc_lock);
         oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;
         oscc->oscc_flags |= OSCC_FLAG_EXITING;
-        spin_unlock(&oscc->oscc_lock);
+        cfs_spin_unlock(&oscc->oscc_lock);
 }