Whamcloud - gitweb
LU-1346 libcfs: replace libcfs wrappers with kernel API
[fs/lustre-release.git] / lustre / ofd / ofd_objects.c
index 1f48a6c..8191c00 100644 (file)
@@ -61,9 +61,9 @@ int ofd_version_get_check(struct ofd_thread_info *info,
            info->fti_pre_version != curr_version) {
                CDEBUG(D_INODE, "Version mismatch "LPX64" != "LPX64"\n",
                       info->fti_pre_version, curr_version);
-               cfs_spin_lock(&info->fti_exp->exp_lock);
+               spin_lock(&info->fti_exp->exp_lock);
                info->fti_exp->exp_vbr_failed = 1;
-               cfs_spin_unlock(&info->fti_exp->exp_lock);
+               spin_unlock(&info->fti_exp->exp_lock);
                RETURN (-EOVERFLOW);
        }
        info->fti_pre_version = curr_version;
@@ -145,35 +145,37 @@ void ofd_object_put(const struct lu_env *env, struct ofd_object *fo)
        lu_object_put(env, &fo->ofo_obj.do_lu);
 }
 
-int ofd_precreate_object(const struct lu_env *env, struct ofd_device *ofd,
-                        obd_id id, obd_seq group)
+int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
+                         obd_id id, obd_seq seq, int nr)
 {
        struct ofd_thread_info  *info = ofd_info(env);
-       struct ofd_object       *fo;
+       struct ofd_object       *fo = NULL;
        struct dt_object        *next;
        struct thandle          *th;
+       struct ofd_object       **batch;
        obd_id                   tmp;
        int                      rc;
+       int                      i;
+       int                      objects = 0;
+       int                      nr_saved = nr;
 
        ENTRY;
 
        /* Don't create objects beyond the valid range for this SEQ */
-       if (unlikely(fid_seq_is_mdt0(group) && id >= IDIF_MAX_OID)) {
+       if (unlikely(fid_seq_is_mdt0(seq) && (id + nr) >= IDIF_MAX_OID)) {
                CERROR("%s:"POSTID" hit the IDIF_MAX_OID (1<<48)!\n",
-                      ofd_name(ofd), id, group);
+                      ofd_name(ofd), id, seq);
                RETURN(rc = -ENOSPC);
-       } else if (unlikely(!fid_seq_is_mdt0(group) && id >= OBIF_MAX_OID)) {
+       } else if (unlikely(!fid_seq_is_mdt0(seq) &&
+                  (id + nr) >= OBIF_MAX_OID)) {
                CERROR("%s:"POSTID" hit the OBIF_MAX_OID (1<<32)!\n",
-                      ofd_name(ofd), id, group);
+                      ofd_name(ofd), id, seq);
                RETURN(rc = -ENOSPC);
        }
-       info->fti_ostid.oi_id = id;
-       info->fti_ostid.oi_seq = group;
-       fid_ostid_unpack(&info->fti_fid, &info->fti_ostid, 0);
 
-       fo = ofd_object_find(env, ofd, &info->fti_fid);
-       if (IS_ERR(fo))
-               RETURN(PTR_ERR(fo));
+       OBD_ALLOC(batch, nr_saved * sizeof(struct ofd_object *));
+       if (batch == NULL)
+               RETURN(-ENOMEM);
 
        info->fti_attr.la_valid = LA_TYPE | LA_MODE;
        /*
@@ -192,61 +194,113 @@ int ofd_precreate_object(const struct lu_env *env, struct ofd_device *ofd,
        info->fti_attr.la_mtime = 0;
        info->fti_attr.la_ctime = 0;
 
-       next = ofd_object_child(fo);
-       LASSERT(next != NULL);
+       /* prepare objects */
+       for (i = 0; i < nr; i++) {
+               info->fti_ostid.oi_id = id + i;
+               info->fti_ostid.oi_seq = seq;
 
+               rc = fid_ostid_unpack(&info->fti_fid, &info->fti_ostid, 0);
+               if (rc) {
+                       if (i == 0)
+                               GOTO(out, rc = PTR_ERR(fo));
+
+                       nr = i;
+                       break;
+               }
+
+               fo = ofd_object_find(env, ofd, &info->fti_fid);
+               if (IS_ERR(fo)) {
+                       if (i == 0)
+                               GOTO(out, rc = PTR_ERR(fo));
+
+                       nr = i;
+                       break;
+               }
+
+               ofd_write_lock(env, fo);
+               batch[i] = fo;
+       }
        info->fti_buf.lb_buf = &tmp;
        info->fti_buf.lb_len = sizeof(tmp);
        info->fti_off = 0;
 
-       ofd_write_lock(env, fo);
        th = ofd_trans_create(env, ofd);
        if (IS_ERR(th))
-               GOTO(out_unlock, rc = PTR_ERR(th));
+               GOTO(out, rc = PTR_ERR(th));
 
-       rc = dt_declare_record_write(env, ofd->ofd_lastid_obj[group],
+       rc = dt_declare_record_write(env, ofd->ofd_lastid_obj[seq],
                                     sizeof(tmp), info->fti_off, th);
        if (rc)
                GOTO(trans_stop, rc);
 
-       if (unlikely(ofd_object_exists(fo))) {
-               /* object may exist being re-created by write replay */
-               CDEBUG(D_INODE, "object %u/"LPD64" exists: "DFID"\n",
-                      (unsigned) group, id, PFID(&info->fti_fid));
-               rc = dt_trans_start_local(env, ofd->ofd_osd, th);
-               if (rc)
-                       GOTO(trans_stop, rc);
-               GOTO(last_id_write, rc);
+       for (i = 0; i < nr; i++) {
+               fo = batch[i];
+               LASSERT(fo);
+
+               if (unlikely(ofd_object_exists(fo))) {
+                       /* object may exist being re-created by write replay */
+                       CDEBUG(D_INODE, "object "LPD64"/"LPD64" exists: "
+                              DFID"\n", seq, id, PFID(&info->fti_fid));
+                       continue;
+               }
+
+               next = ofd_object_child(fo);
+               LASSERT(next != NULL);
+
+               rc = dt_declare_create(env, next, &info->fti_attr, NULL,
+                                      &info->fti_dof, th);
+               if (rc) {
+                       nr = i;
+                       break;
+               }
        }
-       rc = dt_declare_create(env, next, &info->fti_attr, NULL,
-                              &info->fti_dof, th);
-       if (rc)
-               GOTO(trans_stop, rc);
 
        rc = dt_trans_start_local(env, ofd->ofd_osd, th);
        if (rc)
                GOTO(trans_stop, rc);
 
-       CDEBUG(D_OTHER, "create new object %lu:%llu\n",
-              (unsigned long) info->fti_fid.f_oid, info->fti_fid.f_seq);
+       CDEBUG(D_OTHER, "create new object "DFID"\n", PFID(&info->fti_fid));
 
-       rc = dt_create(env, next, &info->fti_attr, NULL, &info->fti_dof, th);
-       if (rc)
-               GOTO(trans_stop, rc);
-       LASSERT(ofd_object_exists(fo));
+       for (i = 0; i < nr; i++) {
+               fo = batch[i];
+               LASSERT(fo);
+
+               if (likely(!ofd_object_exists(fo))) {
+                       next = ofd_object_child(fo);
+                       LASSERT(next != NULL);
 
-last_id_write:
-       ofd_last_id_set(ofd, id, group);
+                       rc = dt_create(env, next, &info->fti_attr, NULL,
+                                      &info->fti_dof, th);
+                       if (rc)
+                               break;
+                       LASSERT(ofd_object_exists(fo));
+               }
+               ofd_last_id_set(ofd, id + i, seq);
+       }
 
-       tmp = cpu_to_le64(ofd_last_id(ofd, group));
-       rc = dt_record_write(env, ofd->ofd_lastid_obj[group], &info->fti_buf,
-                            &info->fti_off, th);
+       objects = i;
+       if (objects > 0) {
+               tmp = cpu_to_le64(ofd_last_id(ofd, seq));
+               rc = dt_record_write(env, ofd->ofd_lastid_obj[seq],
+                                    &info->fti_buf, &info->fti_off, th);
+       }
 trans_stop:
        ofd_trans_stop(env, ofd, th, rc);
-out_unlock:
-       ofd_write_unlock(env, fo);
-       ofd_object_put(env, fo);
-       RETURN(rc);
+out:
+       for (i = 0; i < nr_saved; i++) {
+               fo = batch[i];
+               if (fo) {
+                       ofd_write_unlock(env, fo);
+                       ofd_object_put(env, fo);
+               }
+       }
+       OBD_FREE(batch, nr_saved * sizeof(struct ofd_object *));
+
+       CDEBUG((objects == 0 && rc == 0) ? D_ERROR : D_OTHER,
+              "created %d/%d objects: %d\n", objects, nr_saved, rc);
+
+       LASSERT(ergo(objects == 0, rc < 0));
+       RETURN(objects > 0 ? objects : rc);
 }
 
 /*