4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2013, Intel Corporation.
26 * lustre/target/out_lib.c
28 * Author: Di Wang <di.wang@intel.com>
29 * Author: Fan, Yong <fan.yong@intel.com>
32 #define DEBUG_SUBSYSTEM S_CLASS
34 #include <lu_target.h>
35 #include <lustre_update.h>
38 struct dt_update_request*
39 out_find_update(struct thandle_update *tu, struct dt_device *dt_dev)
41 struct dt_update_request *dt_update;
43 list_for_each_entry(dt_update, &tu->tu_remote_update_list,
45 if (dt_update->dur_dt == dt_dev)
50 EXPORT_SYMBOL(out_find_update);
52 void out_destroy_update_req(struct dt_update_request *dt_update)
54 if (dt_update == NULL)
57 list_del(&dt_update->dur_list);
58 if (dt_update->dur_req != NULL)
59 OBD_FREE_LARGE(dt_update->dur_req, dt_update->dur_req_len);
61 OBD_FREE_PTR(dt_update);
64 EXPORT_SYMBOL(out_destroy_update_req);
66 struct dt_update_request *out_create_update_req(struct dt_device *dt)
68 struct dt_update_request *dt_update;
70 OBD_ALLOC_PTR(dt_update);
72 return ERR_PTR(-ENOMEM);
74 OBD_ALLOC_LARGE(dt_update->dur_req, OUT_UPDATE_INIT_BUFFER_SIZE);
75 if (dt_update->dur_req == NULL) {
76 OBD_FREE_PTR(dt_update);
77 return ERR_PTR(-ENOMEM);
80 dt_update->dur_req_len = OUT_UPDATE_INIT_BUFFER_SIZE;
81 INIT_LIST_HEAD(&dt_update->dur_list);
82 dt_update->dur_dt = dt;
83 dt_update->dur_batchid = 0;
84 dt_update->dur_req->ourq_magic = UPDATE_REQUEST_MAGIC;
85 dt_update->dur_req->ourq_count = 0;
86 INIT_LIST_HEAD(&dt_update->dur_cb_items);
90 EXPORT_SYMBOL(out_create_update_req);
93 * Find or create one loc in th_dev/dev_obj_update for the update,
94 * Because only one thread can access this thandle, no need
97 struct dt_update_request *out_find_create_update_loc(struct thandle *th,
100 struct dt_device *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
101 struct thandle_update *tu = th->th_update;
102 struct dt_update_request *update;
108 RETURN(ERR_PTR(-ENOMEM));
110 INIT_LIST_HEAD(&tu->tu_remote_update_list);
111 tu->tu_sent_after_local_trans = 0;
115 update = out_find_update(tu, dt_dev);
119 update = out_create_update_req(dt_dev);
123 list_add_tail(&update->dur_list, &tu->tu_remote_update_list);
125 if (!tu->tu_only_remote_trans)
130 EXPORT_SYMBOL(out_find_create_update_loc);
132 int out_prep_update_req(const struct lu_env *env, struct obd_import *imp,
133 const struct object_update_request *ureq,
134 struct ptlrpc_request **reqp)
136 struct ptlrpc_request *req;
137 struct object_update_request *tmp;
142 req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
146 ureq_len = object_update_request_size(ureq);
147 req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE, RCL_CLIENT,
150 rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
152 ptlrpc_req_finished(req);
156 req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
157 RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
159 tmp = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE);
160 memcpy(tmp, ureq, ureq_len);
162 ptlrpc_request_set_replen(req);
163 req->rq_request_portal = OUT_PORTAL;
164 req->rq_reply_portal = OSC_REPLY_PORTAL;
169 EXPORT_SYMBOL(out_prep_update_req);
171 int out_remote_sync(const struct lu_env *env, struct obd_import *imp,
172 struct dt_update_request *dt_update,
173 struct ptlrpc_request **reqp)
175 struct ptlrpc_request *req = NULL;
179 rc = out_prep_update_req(env, imp, dt_update->dur_req, &req);
183 /* Note: some dt index api might return non-zero result here, like
184 * osd_index_ea_lookup, so we should only check rc < 0 here */
185 rc = ptlrpc_queue_wait(req);
187 ptlrpc_req_finished(req);
188 dt_update->dur_rc = rc;
197 dt_update->dur_rc = rc;
199 ptlrpc_req_finished(req);
203 EXPORT_SYMBOL(out_remote_sync);
205 static int out_resize_update_req(struct dt_update_request *dt_update,
208 struct object_update_request *ureq;
210 LASSERT(new_size > dt_update->dur_req_len);
212 CDEBUG(D_INFO, "%s: resize update_size from %d to %d\n",
213 dt_update->dur_dt->dd_lu_dev.ld_obd->obd_name,
214 dt_update->dur_req_len, new_size);
216 OBD_ALLOC_LARGE(ureq, new_size);
220 memcpy(ureq, dt_update->dur_req,
221 object_update_request_size(dt_update->dur_req));
223 OBD_FREE_LARGE(dt_update->dur_req, dt_update->dur_req_len);
225 dt_update->dur_req = ureq;
226 dt_update->dur_req_len = new_size;
231 #define OUT_UPDATE_BUFFER_SIZE_ADD 4096
232 #define OUT_UPDATE_BUFFER_SIZE_MAX (64 * 4096) /* 64KB update size now */
234 * Insert the update into the th_bufs for the device.
237 int out_insert_update(const struct lu_env *env,
238 struct dt_update_request *update, int op,
239 const struct lu_fid *fid, int params_count, int *lens,
242 struct object_update_request *ureq = update->dur_req;
244 struct object_update *obj_update;
245 struct object_update_param *param;
252 /* Check update size to make sure it can fit into the buffer */
253 ureq_len = object_update_request_size(ureq);
254 update_length = offsetof(struct object_update, ou_params[0]);
255 for (i = 0; i < params_count; i++)
256 update_length += cfs_size_round(lens[i] + sizeof(*param));
258 if (unlikely(cfs_size_round(ureq_len + update_length) >
259 update->dur_req_len)) {
260 int new_size = update->dur_req_len;
262 /* enlarge object update request size */
264 cfs_size_round(ureq_len + update_length))
265 new_size += OUT_UPDATE_BUFFER_SIZE_ADD;
266 if (new_size >= OUT_UPDATE_BUFFER_SIZE_MAX)
269 rc = out_resize_update_req(update, new_size);
273 ureq = update->dur_req;
276 /* fill the update into the update buffer */
277 obj_update = (struct object_update *)((char *)ureq + ureq_len);
278 obj_update->ou_fid = *fid;
279 obj_update->ou_type = op;
280 obj_update->ou_params_count = (__u16)params_count;
281 obj_update->ou_batchid = update->dur_batchid;
282 param = &obj_update->ou_params[0];
283 for (i = 0; i < params_count; i++) {
284 param->oup_len = lens[i];
285 ptr = ¶m->oup_buf[0];
286 memcpy(¶m->oup_buf[0], bufs[i], lens[i]);
287 param = (struct object_update_param *)((char *)param +
288 object_update_param_size(param));
293 CDEBUG(D_INFO, "%s: %p "DFID" idx %d: op %d params %d:%d\n",
294 update->dur_dt->dd_lu_dev.ld_obd->obd_name, ureq, PFID(fid),
295 ureq->ourq_count, op, params_count, ureq_len + update_length);
299 EXPORT_SYMBOL(out_insert_update);