+static int osc_interpret_create(struct ptlrpc_request *req, void *data, int rc)
+{
+ struct osc_creator *oscc;
+ struct ost_body *body = NULL;
+ ENTRY;
+
+ if (req->rq_repmsg) {
+ body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),
+ lustre_swab_ost_body);
+ if (body == NULL && rc == 0)
+ rc = -EPROTO;
+ }
+
+ oscc = req->rq_async_args.pointer_arg[0];
+ LASSERT(oscc && (oscc->oscc_obd != LP_POISON));
+
+ spin_lock(&oscc->oscc_lock);
+ oscc->oscc_flags &= ~OSCC_FLAG_CREATING;
+ switch (rc) {
+ case 0: {
+ if (body) {
+ int diff = body->oa.o_id - oscc->oscc_last_id;
+
+ if (diff < oscc->oscc_grow_count)
+ oscc->oscc_grow_count =
+ max(diff/3, OST_MIN_PRECREATE);
+ else
+ oscc->oscc_flags &= ~OSCC_FLAG_LOW;
+ oscc->oscc_last_id = body->oa.o_id;
+ }
+ spin_unlock(&oscc->oscc_lock);
+ break;
+ }
+ case -EAGAIN:
+ /* valid race delorphan vs create, or somthing after resend */
+ spin_unlock(&oscc->oscc_lock);
+ DEBUG_REQ(D_INODE, req, "Got EGAIN - resend \n");
+ break;
+ case -ENOSPC:
+ case -EROFS:
+ case -EFBIG: {
+ oscc->oscc_flags |= OSCC_FLAG_NOSPC;
+ if (body && rc == -ENOSPC) {
+ oscc->oscc_grow_count = OST_MIN_PRECREATE;
+ oscc->oscc_last_id = body->oa.o_id;
+ }
+ spin_unlock(&oscc->oscc_lock);
+ DEBUG_REQ(D_INODE, req, "OST out of space, flagging");
+ break;
+ }
+ case -EIO: {
+ /* filter always set body->oa.o_id as the last_id
+ * of filter (see filter_handle_precreate for detail)*/
+ if (body && body->oa.o_id > oscc->oscc_last_id)
+ oscc->oscc_last_id = body->oa.o_id;
+ spin_unlock(&oscc->oscc_lock);
+ break;
+ }
+ default: {
+ oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
+ oscc->oscc_grow_count = OST_MIN_PRECREATE;
+ spin_unlock(&oscc->oscc_lock);
+ DEBUG_REQ(D_ERROR, req,
+ "Unknown rc %d from async create: failing oscc", rc);
+ ptlrpc_fail_import(req->rq_import,
+ lustre_msg_get_conn_cnt(req->rq_reqmsg));
+ }
+ }
+
+ CDEBUG(D_HA, "preallocated through id "LPU64" (next to use "LPU64")\n",
+ oscc->oscc_last_id, oscc->oscc_next_id);
+
+ cfs_waitq_signal(&oscc->oscc_waitq);
+ RETURN(rc);
+}
+
+static int oscc_internal_create(struct osc_creator *oscc)
+{
+ struct ptlrpc_request *request;
+ struct ost_body *body;
+ __u32 size[] = { sizeof(struct ptlrpc_body), sizeof(*body) };
+ ENTRY;
+
+ LASSERT_SPIN_LOCKED(&oscc->oscc_lock);
+
+ if (oscc->oscc_flags & OSCC_FLAG_CREATING ||
+ oscc->oscc_flags & OSCC_FLAG_RECOVERING) {
+ spin_unlock(&oscc->oscc_lock);
+ RETURN(0);
+ }
+
+ if (oscc->oscc_grow_count < oscc->oscc_max_grow_count &&
+ ((oscc->oscc_flags & OSCC_FLAG_LOW) == 0) &&
+ (__s64)(oscc->oscc_last_id - oscc->oscc_next_id) <=
+ (oscc->oscc_grow_count / 4 + 1)) {
+ oscc->oscc_flags |= OSCC_FLAG_LOW;
+ oscc->oscc_grow_count *= 2;
+ }
+
+ if (oscc->oscc_grow_count > oscc->oscc_max_grow_count / 2)
+ oscc->oscc_grow_count = oscc->oscc_max_grow_count / 2;
+
+ oscc->oscc_flags |= OSCC_FLAG_CREATING;
+ spin_unlock(&oscc->oscc_lock);
+
+ request = ptlrpc_prep_req(oscc->oscc_obd->u.cli.cl_import,
+ LUSTRE_OST_VERSION, OST_CREATE, 2,
+ size, NULL);
+ if (request == NULL) {
+ spin_lock(&oscc->oscc_lock);
+ oscc->oscc_flags &= ~OSCC_FLAG_CREATING;
+ spin_unlock(&oscc->oscc_lock);
+ RETURN(-ENOMEM);
+ }
+
+ request->rq_request_portal = OST_CREATE_PORTAL;
+ ptlrpc_at_set_req_timeout(request);
+ body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));
+
+ spin_lock(&oscc->oscc_lock);
+ body->oa.o_id = oscc->oscc_last_id + oscc->oscc_grow_count;
+ body->oa.o_gr = oscc->oscc_oa.o_gr;
+ LASSERT(body->oa.o_gr > 0);
+ body->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
+ spin_unlock(&oscc->oscc_lock);
+ CDEBUG(D_RPCTRACE, "prealloc through id "LPU64" (last seen "LPU64")\n",
+ body->oa.o_id, oscc->oscc_last_id);
+
+ ptlrpc_req_set_repsize(request, 2, size);
+
+ request->rq_async_args.pointer_arg[0] = oscc;
+ request->rq_interpret_reply = osc_interpret_create;
+ ptlrpcd_add_req(request);
+
+ RETURN(0);
+}
+
+static int oscc_has_objects(struct osc_creator *oscc, int count)
+{
+ int have_objs;
+ spin_lock(&oscc->oscc_lock);
+ have_objs = ((__s64)(oscc->oscc_last_id - oscc->oscc_next_id) >= count);
+
+ if (!have_objs) {
+ oscc_internal_create(oscc);
+ } else {
+ spin_unlock(&oscc->oscc_lock);
+ }
+
+ return have_objs;
+}
+
+static int oscc_wait_for_objects(struct osc_creator *oscc, int count)
+{
+ int have_objs;
+ int ost_full;
+ int osc_invalid;
+
+ have_objs = oscc_has_objects(oscc, count);
+
+ spin_lock(&oscc->oscc_lock);
+ ost_full = (oscc->oscc_flags & OSCC_FLAG_NOSPC);
+ spin_unlock(&oscc->oscc_lock);
+
+ osc_invalid = oscc->oscc_obd->u.cli.cl_import->imp_invalid;
+
+ return have_objs || ost_full || osc_invalid;
+}
+
+static int oscc_precreate(struct osc_creator *oscc, int wait)
+{
+ struct l_wait_info lwi = { 0 };
+ int rc = 0;
+ ENTRY;
+
+ if (oscc_has_objects(oscc, oscc->oscc_grow_count / 2))
+ RETURN(0);
+
+ if (!wait)
+ RETURN(0);
+
+ /* no rc check -- a no-INTR, no-TIMEOUT wait can't fail */
+ l_wait_event(oscc->oscc_waitq, oscc_wait_for_objects(oscc, 1), &lwi);
+
+ if (!oscc_has_objects(oscc, 1) && (oscc->oscc_flags & OSCC_FLAG_NOSPC))
+ rc = -ENOSPC;
+
+ if (oscc->oscc_obd->u.cli.cl_import->imp_invalid)
+ rc = -EIO;
+
+ RETURN(rc);
+}
+