Whamcloud - gitweb
LU-2714 hsm: limit MDT-side allocations for HSM RPCs
[fs/lustre-release.git] / lustre / ofd / ofd_obd.c
index ca44509..5bd258f 100644 (file)
@@ -146,7 +146,7 @@ static int ofd_parse_connect_data(const struct lu_env *env,
        fed->fed_group = data->ocd_group;
 
        data->ocd_connect_flags &= OST_CONNECT_SUPPORTED;
-       exp->exp_connect_flags = data->ocd_connect_flags;
+       exp->exp_connect_data = *data;
        data->ocd_version = LUSTRE_VERSION_CODE;
 
        /* Kindly make sure the SKIP_ORPHAN flag is from MDS. */
@@ -165,7 +165,7 @@ static int ofd_parse_connect_data(const struct lu_env *env,
                data->ocd_grant_extent = ofd->ofd_dt_conf.ddp_grant_frag >> 10;
        }
 
-       if (exp->exp_connect_flags & OBD_CONNECT_GRANT)
+       if (exp_connect_flags(exp) & OBD_CONNECT_GRANT)
                data->ocd_grant = ofd_grant_connect(env, exp, data->ocd_grant);
 
        if (data->ocd_connect_flags & OBD_CONNECT_INDEX) {
@@ -193,7 +193,7 @@ static int ofd_parse_connect_data(const struct lu_env *env,
                data->ocd_brw_size = 65536;
        } else if (data->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
                data->ocd_brw_size = min(data->ocd_brw_size,
-                             (__u32)(PTLRPC_MAX_BRW_PAGES << CFS_PAGE_SHIFT));
+                                        (__u32)DT_MAX_BRW_SIZE);
                if (data->ocd_brw_size == 0) {
                        CERROR("%s: cli %s/%p ocd_connect_flags: "LPX64
                               " ocd_version: %x ocd_grant: %d ocd_index: %u "
@@ -237,7 +237,17 @@ static int ofd_parse_connect_data(const struct lu_env *env,
        if (data->ocd_connect_flags & OBD_CONNECT_MAXBYTES)
                data->ocd_maxbytes = ofd->ofd_dt_conf.ddp_maxbytes;
 
-        RETURN(0);
+       if (data->ocd_connect_flags & OBD_CONNECT_PINGLESS) {
+               if (suppress_pings) {
+                       spin_lock(&exp->exp_obd->obd_dev_lock);
+                       list_del_init(&exp->exp_obd_chain_timed);
+                       spin_unlock(&exp->exp_obd->obd_dev_lock);
+               } else {
+                       data->ocd_connect_flags &= ~OBD_CONNECT_PINGLESS;
+               }
+       }
+
+       RETURN(0);
 }
 
 static int ofd_obd_reconnect(const struct lu_env *env, struct obd_export *exp,
@@ -409,7 +419,7 @@ static int ofd_destroy_export(struct obd_export *exp)
        ofd_grant_discard(exp);
        ofd_fmd_cleanup(exp);
 
-       if (exp->exp_connect_flags & OBD_CONNECT_GRANT_SHRINK) {
+       if (exp_connect_flags(exp) & OBD_CONNECT_GRANT_SHRINK) {
                if (ofd->ofd_tot_granted_clients > 0)
                        ofd->ofd_tot_granted_clients --;
        }
@@ -567,7 +577,7 @@ static int ofd_get_info(const struct lu_env *env, struct obd_export *exp,
                ofd_info_init(env, exp);
                oseq = ofd_seq_load(env, ofd,
                                    (obd_seq)exp->exp_filter_data.fed_group);
-               LASSERT(oseq != NULL);
+               LASSERT(!IS_ERR(oseq));
                if (last_id) {
                        if (*vallen < sizeof(*last_id)) {
                                ofd_seq_put(env, oseq);
@@ -620,36 +630,39 @@ static int ofd_get_info(const struct lu_env *env, struct obd_export *exp,
                *((__u32 *) val) = ofd->ofd_sync_lock_cancel;
                *vallen = sizeof(__u32);
        } else if (KEY_IS(KEY_LAST_FID)) {
-               struct lu_env      env;
-               struct ofd_device *ofd = ofd_exp(exp);
-               struct ofd_seq    *oseq;
-               struct lu_fid     *last_fid = val;
-               int             rc;
-
-               if (last_fid == NULL) {
-                       *vallen = sizeof(struct lu_fid);
+               struct lu_env           env;
+               struct ofd_device       *ofd = ofd_exp(exp);
+               struct ofd_seq          *oseq;
+               struct ost_id           *oid = val;
+               int                     rc;
+
+               if (oid == NULL) {
+                       *vallen = sizeof(struct ost_id);
                        RETURN(0);
                }
 
-               if (*vallen < sizeof(*last_fid))
+               if (*vallen < sizeof(*oid))
                        RETURN(-EOVERFLOW);
 
                rc = lu_env_init(&env, LCT_DT_THREAD);
                if (rc != 0)
                        RETURN(rc);
                ofd_info_init(&env, exp);
-               fid_le_to_cpu(last_fid, last_fid);
-               oseq = ofd_seq_load(&env, ofd, fid_seq(last_fid));
+
+               ostid_le_to_cpu(oid, oid);
+               CDEBUG(D_HA, "Get LAST FID for seq "LPX64"\n", oid->oi_seq);
+
+               oseq = ofd_seq_load(&env, ofd, oid->oi_seq);
                if (IS_ERR(oseq))
-                       GOTO(out_fid, rc = PTR_ERR(oseq));
+                       GOTO(out_fini, rc = PTR_ERR(oseq));
 
-               last_fid->f_seq = oseq->os_seq;
-               last_fid->f_oid = oseq->os_last_oid;
-               fid_cpu_to_le(last_fid, last_fid);
+               CDEBUG(D_HA, "LAST FID is "POSTID"\n", oseq->os_last_oid,
+                      oseq->os_seq);
 
-               *vallen = sizeof(*last_fid);
+               *oid = oseq->os_oi;
+               *vallen = sizeof(*oid);
                ofd_seq_put(&env, oseq);
-out_fid:
+out_fini:
                lu_env_fini(&env);
        } else {
                CERROR("Not supported key %s\n", (char*)key);
@@ -769,7 +782,7 @@ static int ofd_statfs(const struct lu_env *env,  struct obd_export *exp,
 
        /* The QoS code on the MDS does not care about space reserved for
         * precreate, so take it out. */
-       if (exp->exp_connect_flags & OBD_CONNECT_MDS) {
+       if (exp_connect_flags(exp) & OBD_CONNECT_MDS) {
                struct filter_export_data *fed;
 
                fed = &obd->obd_self_export->exp_filter_data;
@@ -1115,7 +1128,7 @@ static int ofd_orphans_destroy(const struct lu_env *env,
        }
 
        LASSERT(exp != NULL);
-       skip_orphan = !!(exp->exp_connect_flags & OBD_CONNECT_SKIP_ORPHAN);
+       skip_orphan = !!(exp_connect_flags(exp) & OBD_CONNECT_SKIP_ORPHAN);
 
        last = ofd_seq_last_oid(oseq);
        LCONSOLE_INFO("%s: deleting orphan objects from "LPX64":"LPU64
@@ -1158,7 +1171,8 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp,
        struct ofd_thread_info  *info;
        obd_seq                 seq = oa->o_seq;
        struct ofd_seq          *oseq;
-       int                      rc = 0, diff;
+       int                     rc = 0, diff;
+       int                     sync_trans = 0;
 
        ENTRY;
 
@@ -1172,8 +1186,9 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp,
               seq, oa->o_id);
 
        oseq = ofd_seq_load(env, ofd, seq);
-       if (oseq == NULL) {
-               CERROR("%s: Can't find oseq "LPX64"\n", ofd_name(ofd), seq);
+       if (IS_ERR(oseq)) {
+               CERROR("%s: Can't find FID Sequence "LPX64": rc = %ld\n",
+                      ofd_name(ofd), seq, PTR_ERR(oseq));
                RETURN(-EINVAL);
        }
 
@@ -1233,6 +1248,20 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp,
                        diff = 1; /* shouldn't we create this right now? */
                } else {
                        diff = oa->o_id - ofd_seq_last_oid(oseq);
+                       /* Do sync create if the seq is about to used up */
+                       if (fid_seq_is_idif(oa->o_seq) ||
+                           fid_seq_is_mdt0(oa->o_seq)) {
+                               if (unlikely(oa->o_id >= IDIF_MAX_OID - 1))
+                                       sync_trans = 1;
+                       } else if (fid_seq_is_norm(oa->o_seq)) {
+                               if (unlikely(oa->o_id >=
+                                            LUSTRE_DATA_SEQ_MAX_WIDTH - 1))
+                                       sync_trans = 1;
+                       } else {
+                               CERROR("%s : invalid o_seq "LPX64": rc = %d\n",
+                                      ofd_name(ofd), oa->o_seq, -EINVAL);
+                               GOTO(out, rc = -EINVAL);
+                       }
                }
        }
        if (diff > 0) {
@@ -1273,7 +1302,7 @@ int ofd_create(const struct lu_env *env, struct obd_export *exp,
                        }
 
                        rc = ofd_precreate_objects(env, ofd, next_id,
-                                                  oseq, count);
+                                                  oseq, count, sync_trans);
                        if (rc > 0) {
                                created += rc;
                                diff -= rc;