+/* XXX need AT adjust ? */
+#define osc_create_timeout (obd_timeout / 2)
+
+struct osc_create_async_args {
+ struct osc_creator *rq_oscc;
+ struct lov_stripe_md *rq_lsm;
+ struct obd_info *rq_oinfo;
+};
+
+static int oscc_internal_create(struct osc_creator *oscc);
+static int handle_async_create(struct ptlrpc_request *req, int rc);
+
+static int osc_interpret_create(const struct lu_env *env,
+ struct ptlrpc_request *req, void *data, int rc)
+{
+ struct osc_creator *oscc;
+ struct ost_body *body = NULL;
+ struct ptlrpc_request *fake_req, *pos;
+ ENTRY;
+
+ if (req->rq_repmsg) {
+ body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
+ lustre_swab_ost_body);
+ if (body == NULL && rc == 0)
+ rc = -EPROTO;
+ }
+
+ oscc = req->rq_async_args.pointer_arg[0];
+ LASSERT(oscc && (oscc->oscc_obd != LP_POISON));
+
+ spin_lock(&oscc->oscc_lock);
+ oscc->oscc_flags &= ~OSCC_FLAG_CREATING;
+ switch (rc) {
+ case 0: {
+ if (body) {
+ int diff = body->oa.o_id - oscc->oscc_last_id;
+
+ /* oscc_internal_create() stores the original value of
+ * grow_count in rq_async_args.space[0].
+ * We can't compare against oscc_grow_count directly,
+ * because it may have been increased while the RPC
+ * is in flight, so we would always find ourselves
+ * having created fewer objects and decreasing the
+ * precreate request size. b=18577 */
+ if (diff < (int) req->rq_async_args.space[0]) {
+ /* the OST has not managed to create all the
+ * objects we asked for */
+ oscc->oscc_grow_count = max(diff,
+ OST_MIN_PRECREATE);
+ /* don't bump grow_count next time */
+ oscc->oscc_flags |= OSCC_FLAG_LOW;
+ } else {
+ /* the OST is able to keep up with the work,
+ * we could consider increasing grow_count
+ * next time if needed */
+ oscc->oscc_flags &= ~OSCC_FLAG_LOW;
+ }
+ oscc->oscc_last_id = body->oa.o_id;
+ }
+ spin_unlock(&oscc->oscc_lock);
+ break;
+ }
+ case -ENOSPC:
+ case -EROFS:
+ case -EFBIG: {
+ oscc->oscc_flags |= OSCC_FLAG_NOSPC;
+ if (body && rc == -ENOSPC) {
+ oscc->oscc_grow_count = OST_MIN_PRECREATE;
+ oscc->oscc_last_id = body->oa.o_id;
+ }
+ spin_unlock(&oscc->oscc_lock);
+ DEBUG_REQ(D_INODE, req, "OST out of space, flagging");
+ break;
+ }
+ case -EIO: {
+ /* filter always set body->oa.o_id as the last_id
+ * of filter (see filter_handle_precreate for detail)*/
+ if (body && body->oa.o_id > oscc->oscc_last_id)
+ oscc->oscc_last_id = body->oa.o_id;
+ spin_unlock(&oscc->oscc_lock);
+ break;
+ }
+ case -EWOULDBLOCK: {
+ /* aka EAGAIN we should not delay create if import failed -
+ * this avoid client stick in create and avoid race with
+ * delorphan */
+ oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
+ /* oscc->oscc_grow_count = OST_MIN_PRECREATE; */
+ spin_unlock(&oscc->oscc_lock);
+ break;
+ }
+ default: {
+ oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
+ oscc->oscc_grow_count = OST_MIN_PRECREATE;
+ spin_unlock(&oscc->oscc_lock);
+ DEBUG_REQ(D_ERROR, req,
+ "Unknown rc %d from async create: failing oscc", rc);
+ ptlrpc_fail_import(req->rq_import,
+ lustre_msg_get_conn_cnt(req->rq_reqmsg));
+ }
+ }
+
+ CDEBUG(D_HA, "preallocated through id "LPU64" (next to use "LPU64")\n",
+ oscc->oscc_last_id, oscc->oscc_next_id);
+
+ spin_lock(&oscc->oscc_lock);
+ list_for_each_entry_safe(fake_req, pos,
+ &oscc->oscc_wait_create_list, rq_list) {
+ if (handle_async_create(fake_req, rc) == -EAGAIN) {
+ oscc_internal_create(oscc);
+ /* sending request should be never fail because
+ * osc use preallocated requests pool */
+ GOTO(exit_wakeup, rc);
+ }
+ }
+ spin_unlock(&oscc->oscc_lock);
+
+exit_wakeup:
+ cfs_waitq_signal(&oscc->oscc_waitq);
+ RETURN(rc);
+}
+
+static int oscc_internal_create(struct osc_creator *oscc)
+{
+ struct ptlrpc_request *request;
+ struct ost_body *body;
+ __u32 size[] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+ ENTRY;
+
+ LASSERT_SPIN_LOCKED(&oscc->oscc_lock);
+
+ if(oscc->oscc_flags & OSCC_FLAG_RECOVERING) {
+ spin_unlock(&oscc->oscc_lock);
+ RETURN(0);
+ }
+
+ /* we need check it before OSCC_FLAG_CREATING - because need
+ * see lower number of precreate objects */
+ if (oscc->oscc_grow_count < oscc->oscc_max_grow_count &&
+ ((oscc->oscc_flags & OSCC_FLAG_LOW) == 0) &&
+ (__s64)(oscc->oscc_last_id - oscc->oscc_next_id) <=
+ (oscc->oscc_grow_count / 4 + 1)) {
+ oscc->oscc_flags |= OSCC_FLAG_LOW;
+ oscc->oscc_grow_count *= 2;
+ }
+
+ if (oscc->oscc_flags & OSCC_FLAG_CREATING) {
+ spin_unlock(&oscc->oscc_lock);
+ RETURN(0);
+ }
+
+ if (oscc->oscc_grow_count > oscc->oscc_max_grow_count / 2)
+ oscc->oscc_grow_count = oscc->oscc_max_grow_count / 2;
+
+ oscc->oscc_flags |= OSCC_FLAG_CREATING;
+ spin_unlock(&oscc->oscc_lock);
+
+ request = ptlrpc_prep_req(oscc->oscc_obd->u.cli.cl_import,
+ LUSTRE_OST_VERSION, OST_CREATE, 2,
+ size, NULL);
+ if (request == NULL) {
+ spin_lock(&oscc->oscc_lock);
+ oscc->oscc_flags &= ~OSCC_FLAG_CREATING;
+ spin_unlock(&oscc->oscc_lock);
+ RETURN(-ENOMEM);
+ }
+
+ request->rq_request_portal = OST_CREATE_PORTAL;
+ ptlrpc_at_set_req_timeout(request);
+ body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
+
+ spin_lock(&oscc->oscc_lock);
+ body->oa.o_id = oscc->oscc_last_id + oscc->oscc_grow_count;
+ body->oa.o_gr = oscc->oscc_oa.o_gr;
+ LASSERT_MDS_GROUP(body->oa.o_gr);
+ body->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
+ request->rq_async_args.space[0] = oscc->oscc_grow_count;
+ spin_unlock(&oscc->oscc_lock);
+ CDEBUG(D_RPCTRACE, "prealloc through id "LPU64" (last seen "LPU64")\n",
+ body->oa.o_id, oscc->oscc_last_id);
+
+ /* we should not resend create request - anyway we will have delorphan
+ * and kill these objects */
+ request->rq_no_delay = request->rq_no_resend = 1;
+ ptlrpc_req_set_repsize(request, 2, size);
+
+ request->rq_async_args.pointer_arg[0] = oscc;
+ request->rq_interpret_reply = osc_interpret_create;
+ ptlrpcd_add_req(request, PSCOPE_OTHER);
+
+ RETURN(0);
+}
+
+static int oscc_has_objects_nolock(struct osc_creator *oscc, int count)