Whamcloud - gitweb
LU-4478 ldiskfs: fix problem when ldiskfs_acct_on() fails
[fs/lustre-release.git] / lustre / target / out_lib.c
index c2d41f7..061dac5 100644 (file)
 #include <lustre_update.h>
 #include <obd.h>
 
-struct update_request *out_find_update(struct thandle_update *tu,
-                                      struct dt_device *dt_dev)
+struct dt_update_request*
+out_find_update(struct thandle_update *tu, struct dt_device *dt_dev)
 {
-       struct update_request   *update;
+       struct dt_update_request   *dt_update;
 
-       LASSERT(tu != NULL);
-       list_for_each_entry(update, &tu->tu_remote_update_list, ur_list) {
-               if (update->ur_dt == dt_dev)
-                       return update;
+       list_for_each_entry(dt_update, &tu->tu_remote_update_list,
+                           dur_list) {
+               if (dt_update->dur_dt == dt_dev)
+                       return dt_update;
        }
-
        return NULL;
 }
 EXPORT_SYMBOL(out_find_update);
 
-void out_destroy_update_req(struct update_request *update)
+void out_destroy_update_req(struct dt_update_request *dt_update)
 {
-       if (update == NULL)
+       if (dt_update == NULL)
                return;
 
-       LASSERT(list_empty(&update->ur_cb_items));
-
-       list_del(&update->ur_list);
-       if (update->ur_buf != NULL)
-               OBD_FREE_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE);
+       list_del(&dt_update->dur_list);
+       if (dt_update->dur_req != NULL)
+               OBD_FREE_LARGE(dt_update->dur_req, dt_update->dur_req_len);
 
-       OBD_FREE_PTR(update);
+       OBD_FREE_PTR(dt_update);
+       return;
 }
 EXPORT_SYMBOL(out_destroy_update_req);
 
-struct update_request *out_create_update_req(struct dt_device *dt)
+struct dt_update_request *out_create_update_req(struct dt_device *dt)
 {
-       struct update_request *update;
+       struct dt_update_request *dt_update;
 
-       OBD_ALLOC_PTR(update);
-       if (update == NULL)
+       OBD_ALLOC_PTR(dt_update);
+       if (!dt_update)
                return ERR_PTR(-ENOMEM);
 
-       OBD_ALLOC_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE);
-       if (update->ur_buf == NULL) {
-               OBD_FREE_PTR(update);
-
+       OBD_ALLOC_LARGE(dt_update->dur_req, OUT_UPDATE_INIT_BUFFER_SIZE);
+       if (dt_update->dur_req == NULL) {
+               OBD_FREE_PTR(dt_update);
                return ERR_PTR(-ENOMEM);
        }
 
-       INIT_LIST_HEAD(&update->ur_list);
-       update->ur_dt = dt;
-       update->ur_buf->ub_magic = UPDATE_BUFFER_MAGIC;
-       update->ur_buf->ub_count = 0;
-       INIT_LIST_HEAD(&update->ur_cb_items);
+       dt_update->dur_req_len = OUT_UPDATE_INIT_BUFFER_SIZE;
+       INIT_LIST_HEAD(&dt_update->dur_list);
+       dt_update->dur_dt = dt;
+       dt_update->dur_batchid = 0;
+       dt_update->dur_req->ourq_magic = UPDATE_REQUEST_MAGIC;
+       dt_update->dur_req->ourq_count = 0;
+       INIT_LIST_HEAD(&dt_update->dur_cb_items);
 
-       return update;
+       return dt_update;
 }
 EXPORT_SYMBOL(out_create_update_req);
 
@@ -95,12 +94,12 @@ EXPORT_SYMBOL(out_create_update_req);
  * Because only one thread can access this thandle, no need
  * lock now.
  */
-struct update_request *out_find_create_update_loc(struct thandle *th,
+struct dt_update_request *out_find_create_update_loc(struct thandle *th,
                                                  struct dt_object *dt)
 {
        struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
        struct thandle_update   *tu = th->th_update;
-       struct update_request   *update;
+       struct dt_update_request *update;
        ENTRY;
 
        if (tu == NULL) {
@@ -121,41 +120,45 @@ struct update_request *out_find_create_update_loc(struct thandle *th,
        if (IS_ERR(update))
                RETURN(update);
 
-       list_add_tail(&update->ur_list, &tu->tu_remote_update_list);
+       list_add_tail(&update->dur_list, &tu->tu_remote_update_list);
 
-       thandle_get(th);
+       if (!tu->tu_only_remote_trans)
+               thandle_get(th);
 
        RETURN(update);
 }
 EXPORT_SYMBOL(out_find_create_update_loc);
 
 int out_prep_update_req(const struct lu_env *env, struct obd_import *imp,
-                       const struct update_buf *ubuf, int ubuf_len,
+                       const struct object_update_request *ureq,
                        struct ptlrpc_request **reqp)
 {
-       struct ptlrpc_request  *req;
-       struct update_buf      *tmp;
-       int                     rc;
+       struct ptlrpc_request           *req;
+       struct object_update_request    *tmp;
+       int                             ureq_len;
+       int                             rc;
        ENTRY;
 
-       req = ptlrpc_request_alloc(imp, &RQF_UPDATE_OBJ);
+       req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
        if (req == NULL)
                RETURN(-ENOMEM);
 
-       req_capsule_set_size(&req->rq_pill, &RMF_UPDATE, RCL_CLIENT,
-                            UPDATE_BUFFER_SIZE);
+       ureq_len = object_update_request_size(ureq);
+       req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE, RCL_CLIENT,
+                            ureq_len);
 
-       rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, UPDATE_OBJ);
+       rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
        if (rc != 0) {
                ptlrpc_req_finished(req);
                RETURN(rc);
        }
 
-       req_capsule_set_size(&req->rq_pill, &RMF_UPDATE_REPLY, RCL_SERVER,
-                            UPDATE_BUFFER_SIZE);
+       req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
+                            RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
+
+       tmp = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE);
+       memcpy(tmp, ureq, ureq_len);
 
-       tmp = req_capsule_client_get(&req->rq_pill, &RMF_UPDATE);
-       memcpy(tmp, ubuf, ubuf_len);
        ptlrpc_request_set_replen(req);
        req->rq_request_portal = OUT_PORTAL;
        req->rq_reply_portal = OSC_REPLY_PORTAL;
@@ -166,15 +169,14 @@ int out_prep_update_req(const struct lu_env *env, struct obd_import *imp,
 EXPORT_SYMBOL(out_prep_update_req);
 
 int out_remote_sync(const struct lu_env *env, struct obd_import *imp,
-                   struct update_request *update,
+                   struct dt_update_request *dt_update,
                    struct ptlrpc_request **reqp)
 {
        struct ptlrpc_request   *req = NULL;
-       int                      rc;
+       int                     rc;
        ENTRY;
 
-       rc = out_prep_update_req(env, imp, update->ur_buf,
-                                UPDATE_BUFFER_SIZE, &req);
+       rc = out_prep_update_req(env, imp, dt_update->dur_req, &req);
        if (rc != 0)
                RETURN(rc);
 
@@ -183,66 +185,115 @@ int out_remote_sync(const struct lu_env *env, struct obd_import *imp,
        rc = ptlrpc_queue_wait(req);
        if (rc < 0) {
                ptlrpc_req_finished(req);
-               update->ur_rc = rc;
+               dt_update->dur_rc = rc;
                RETURN(rc);
        }
 
        if (reqp != NULL) {
                *reqp = req;
-       } else {
-               update->ur_rc = rc;
-               ptlrpc_req_finished(req);
+               RETURN(rc);
        }
 
+       dt_update->dur_rc = rc;
+
+       ptlrpc_req_finished(req);
+
        RETURN(rc);
 }
 EXPORT_SYMBOL(out_remote_sync);
 
-int out_insert_update(const struct lu_env *env, struct update_request *update,
-                     int op, const struct lu_fid *fid, int count,
-                     int *lens, const char **bufs)
+static int out_resize_update_req(struct dt_update_request *dt_update,
+                                int new_size)
 {
-       struct update_buf    *ubuf = update->ur_buf;
-       struct update        *obj_update;
-       char                 *ptr;
-       int                   i;
-       int                   update_length;
-       ENTRY;
+       struct object_update_request *ureq;
 
-       obj_update = (struct update *)((char *)ubuf +
-                     cfs_size_round(update_buf_size(ubuf)));
+       LASSERT(new_size > dt_update->dur_req_len);
 
-       /* Check update size to make sure it can fit into the buffer */
-       update_length = cfs_size_round(offsetof(struct update,
-                                      u_bufs[0]));
-       for (i = 0; i < count; i++)
-               update_length += cfs_size_round(lens[i]);
+       CDEBUG(D_INFO, "%s: resize update_size from %d to %d\n",
+              dt_update->dur_dt->dd_lu_dev.ld_obd->obd_name,
+              dt_update->dur_req_len, new_size);
 
-       if (cfs_size_round(update_buf_size(ubuf)) + update_length >
-           UPDATE_BUFFER_SIZE || ubuf->ub_count >= UPDATE_MAX_OPS)
-               RETURN(-E2BIG);
+       OBD_ALLOC_LARGE(ureq, new_size);
+       if (ureq == NULL)
+               return -ENOMEM;
 
-       if (count > UPDATE_BUF_COUNT)
-               RETURN(-E2BIG);
+       memcpy(ureq, dt_update->dur_req,
+              object_update_request_size(dt_update->dur_req));
 
-       /* fill the update into the update buffer */
-       fid_cpu_to_le(&obj_update->u_fid, fid);
-       obj_update->u_type = cpu_to_le32(op);
-       obj_update->u_batchid = update->ur_batchid;
-       for (i = 0; i < count; i++)
-               obj_update->u_lens[i] = cpu_to_le32(lens[i]);
+       OBD_FREE_LARGE(dt_update->dur_req, dt_update->dur_req_len);
+
+       dt_update->dur_req = ureq;
+       dt_update->dur_req_len = new_size;
 
-       ptr = (char *)obj_update +
-                       cfs_size_round(offsetof(struct update, u_bufs[0]));
-       for (i = 0; i < count; i++)
-               LOGL(bufs[i], lens[i], ptr);
+       return 0;
+}
+
+#define OUT_UPDATE_BUFFER_SIZE_ADD     4096
+#define OUT_UPDATE_BUFFER_SIZE_MAX     (64 * 4096)  /* 64KB update size now */
+/**
+ * Insert the update into the th_bufs for the device.
+ */
+
+int out_insert_update(const struct lu_env *env,
+                     struct dt_update_request *update, int op,
+                     const struct lu_fid *fid, int params_count, int *lens,
+                     const char **bufs)
+{
+       struct object_update_request    *ureq = update->dur_req;
+       int                             ureq_len;
+       struct object_update            *obj_update;
+       struct object_update_param      *param;
+       int                             update_length;
+       int                             rc = 0;
+       char                            *ptr;
+       int                             i;
+       ENTRY;
 
-       ubuf->ub_count++;
+       /* Check update size to make sure it can fit into the buffer */
+       ureq_len = object_update_request_size(ureq);
+       update_length = offsetof(struct object_update, ou_params[0]);
+       for (i = 0; i < params_count; i++)
+               update_length += cfs_size_round(lens[i] + sizeof(*param));
+
+       if (unlikely(cfs_size_round(ureq_len + update_length) >
+                    update->dur_req_len)) {
+               int new_size = update->dur_req_len;
+
+               /* enlarge object update request size */
+               while (new_size <
+                      cfs_size_round(ureq_len + update_length))
+                       new_size += OUT_UPDATE_BUFFER_SIZE_ADD;
+               if (new_size >= OUT_UPDATE_BUFFER_SIZE_MAX)
+                       RETURN(-E2BIG);
+
+               rc = out_resize_update_req(update, new_size);
+               if (rc != 0)
+                       RETURN(rc);
+
+               ureq = update->dur_req;
+       }
 
-       CDEBUG(D_INFO, "%s: %p "DFID" idx %d: op %d params %d:%lu\n",
-              update->ur_dt->dd_lu_dev.ld_obd->obd_name, ubuf, PFID(fid),
-              ubuf->ub_count, op, count, update_buf_size(ubuf));
+       /* fill the update into the update buffer */
+       obj_update = (struct object_update *)((char *)ureq + ureq_len);
+       obj_update->ou_fid = *fid;
+       obj_update->ou_type = op;
+       obj_update->ou_params_count = (__u16)params_count;
+       obj_update->ou_batchid = update->dur_batchid;
+       param = &obj_update->ou_params[0];
+       for (i = 0; i < params_count; i++) {
+               param->oup_len = lens[i];
+               ptr = &param->oup_buf[0];
+               memcpy(&param->oup_buf[0], bufs[i], lens[i]);
+               param = (struct object_update_param *)((char *)param +
+                        object_update_param_size(param));
+       }
 
-       RETURN(0);
+       ureq->ourq_count++;
+
+       CDEBUG(D_INFO, "%s: %p "DFID" idx %d: op %d params %d:%d\n",
+              update->dur_dt->dd_lu_dev.ld_obd->obd_name, ureq, PFID(fid),
+              ureq->ourq_count, op, params_count, ureq_len + update_length);
+
+       RETURN(rc);
 }
 EXPORT_SYMBOL(out_insert_update);