Whamcloud - gitweb
LU-3534 osp: move RPC pack from declare to execution phase
[fs/lustre-release.git] / lustre / target / out_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, Intel Corporation.
24  */
25 /*
26  * lustre/target/out_lib.c
27  *
28  * Author: Di Wang <di.wang@intel.com>
29  * Author: Fan, Yong <fan.yong@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_CLASS
33
34 #include <lu_target.h>
35 #include <lustre_update.h>
36 #include <obd.h>
37 #include <obd_class.h>
38
39 #define OUT_UPDATE_BUFFER_SIZE_ADD      4096
40 #define OUT_UPDATE_BUFFER_SIZE_MAX      (256 * 4096)  /* 1MB update size now */
41 /**
42  * resize update buffer
43  *
44  * Extend the update buffer by new_size.
45  *
46  * \param[in] ubuf      update buffer to be extended
47  * \param[in] new_size  new size of the update buffer
48  *
49  * \retval              0 if extending succeeds.
50  * \retval              negative errno if extending fails.
51  */
52 static int update_buffer_resize(struct update_buffer *ubuf, size_t new_size)
53 {
54         struct object_update_request *ureq;
55
56         if (new_size > ubuf->ub_req_size)
57                 return 0;
58
59         OBD_ALLOC_LARGE(ureq, new_size);
60         if (ureq == NULL)
61                 return -ENOMEM;
62
63         memcpy(ureq, ubuf->ub_req, ubuf->ub_req_size);
64
65         OBD_FREE_LARGE(ubuf->ub_req, ubuf->ub_req_size);
66
67         ubuf->ub_req = ureq;
68         ubuf->ub_req_size = new_size;
69
70         return 0;
71 }
72
73 /**
74  * Pack the header of object_update_request
75  *
76  * Packs updates into the update_buffer header, which will either be sent to
77  * the remote MDT or stored in the local update log. The maximum update buffer
78  * size is 1MB for now.
79  *
80  * \param[in] env       execution environment
81  * \param[in] ubuf      update bufer which it will pack the update in
82  * \param[in] op        update operation
83  * \param[in] fid       object FID for this update
84  * \param[in] param_count       parameters count for this update
85  * \param[in] lens      each parameters length of this update
86  * \param[in] batchid   batchid(transaction no) of this update
87  *
88  * \retval              0 pack update succeed.
89  * \retval              negative errno pack update failed.
90  **/
91 static struct object_update *
92 out_update_header_pack(const struct lu_env *env, struct update_buffer *ubuf,
93                        enum update_type op, const struct lu_fid *fid,
94                        int params_count, __u16 *param_sizes, __u64 batchid)
95 {
96         struct object_update_request    *ureq = ubuf->ub_req;
97         size_t                          ureq_size = ubuf->ub_req_size;
98         struct object_update            *obj_update;
99         struct object_update_param      *param;
100         size_t                          update_size;
101         int                             rc = 0;
102         unsigned int                    i;
103         ENTRY;
104
105         /* Check update size to make sure it can fit into the buffer */
106         ureq_size = object_update_request_size(ureq);
107         update_size = offsetof(struct object_update, ou_params[0]);
108         for (i = 0; i < params_count; i++)
109                 update_size += cfs_size_round(param_sizes[i] + sizeof(*param));
110
111         if (unlikely(cfs_size_round(ureq_size + update_size) >
112                      ubuf->ub_req_size)) {
113                 size_t new_size = ubuf->ub_req_size;
114
115                 /* enlarge object update request size */
116                 while (new_size <
117                        cfs_size_round(ureq_size + update_size))
118                         new_size += OUT_UPDATE_BUFFER_SIZE_ADD;
119                 if (new_size >= OUT_UPDATE_BUFFER_SIZE_MAX)
120                         RETURN(ERR_PTR(-E2BIG));
121
122                 rc = update_buffer_resize(ubuf, new_size);
123                 if (rc < 0)
124                         RETURN(ERR_PTR(rc));
125
126                 ureq = ubuf->ub_req;
127         }
128
129         /* fill the update into the update buffer */
130         obj_update = (struct object_update *)((char *)ureq + ureq_size);
131         obj_update->ou_fid = *fid;
132         obj_update->ou_type = op;
133         obj_update->ou_params_count = (__u16)params_count;
134         obj_update->ou_batchid = batchid;
135         param = &obj_update->ou_params[0];
136         for (i = 0; i < params_count; i++) {
137                 param->oup_len = param_sizes[i];
138                 param = (struct object_update_param *)((char *)param +
139                          object_update_param_size(param));
140         }
141
142         CDEBUG(D_INFO, "%p "DFID" idx %u: op %d params %d:%d\n",
143                ureq, PFID(fid), ureq->ourq_count, op, params_count,
144                (int)update_size);
145         ureq->ourq_count++;
146
147         RETURN(obj_update);
148 }
149
150 /**
151  * Packs one update into the update_buffer.
152  *
153  * \param[in] env       execution environment
154  * \param[in] ubuf      bufer where update will be packed
155  * \param[in] op        update operation (enum update_type)
156  * \param[in] fid       object FID for this update
157  * \param[in] param_count       number of parameters for this update
158  * \param[in] param_sizes       array of parameters length of this update
159  * \param[in] param_bufs        parameter buffers
160  * \param[in] batchid   transaction no of this update, plus mdt_index, which
161  *                      will be globally unique
162  *
163  * \retval              = 0 if updates packing succeeds
164  * \retval              negative errno if updates packing fails
165  **/
166 int out_update_pack(const struct lu_env *env, struct update_buffer *ubuf,
167                     enum update_type op, const struct lu_fid *fid,
168                     int params_count, __u16 *param_sizes,
169                     const void **param_bufs, __u64 batchid)
170 {
171         struct object_update            *update;
172         struct object_update_param      *param;
173         unsigned int                    i;
174         ENTRY;
175
176         update = out_update_header_pack(env, ubuf, op, fid, params_count,
177                                         param_sizes, batchid);
178         if (IS_ERR(update))
179                 RETURN(PTR_ERR(update));
180
181         param = &update->ou_params[0];
182         for (i = 0; i < params_count; i++) {
183                 memcpy(&param->oup_buf[0], param_bufs[i], param_sizes[i]);
184                 param = (struct object_update_param *)((char *)param +
185                          object_update_param_size(param));
186         }
187
188         RETURN(0);
189 }
190 EXPORT_SYMBOL(out_update_pack);
191
192 /**
193  * Pack various updates into the update_buffer.
194  *
195  * The following functions pack different updates into the update_buffer
196  * So parameters of these API is basically same as its correspondent OSD/OSP
197  * API, for detail description of these parameters see osd_handler.c or
198  * osp_md_object.c.
199  *
200  * \param[in] env       execution environment
201  * \param[in] ubuf      update buffer
202  * \param[in] fid       fid of this object for the update
203  * \param[in] batchid   batch id of this update
204  *
205  * \retval              0 if insertion succeeds.
206  * \retval              negative errno if insertion fails.
207  */
208 int out_create_pack(const struct lu_env *env, struct update_buffer *ubuf,
209                     const struct lu_fid *fid, struct lu_attr *attr,
210                     struct dt_allocation_hint *hint,
211                     struct dt_object_format *dof, __u64 batchid)
212 {
213         struct obdo             *obdo;
214         __u16                   sizes[2] = {sizeof(*obdo), 0};
215         int                     buf_count = 1;
216         const struct lu_fid     *fid1 = NULL;
217         struct object_update    *update;
218         ENTRY;
219
220         if (hint != NULL && hint->dah_parent) {
221                 fid1 = lu_object_fid(&hint->dah_parent->do_lu);
222                 sizes[1] = sizeof(*fid1);
223                 buf_count++;
224         }
225
226         update = out_update_header_pack(env, ubuf, OUT_CREATE, fid,
227                                         buf_count, sizes, batchid);
228         if (IS_ERR(update))
229                 RETURN(PTR_ERR(update));
230
231         obdo = object_update_param_get(update, 0, NULL);
232         obdo->o_valid = 0;
233         obdo_from_la(obdo, attr, attr->la_valid);
234         lustre_set_wire_obdo(NULL, obdo, obdo);
235         if (fid1 != NULL) {
236                 struct lu_fid *fid;
237                 fid = object_update_param_get(update, 1, NULL);
238                 fid_cpu_to_le(fid, fid1);
239         }
240
241         RETURN(0);
242 }
243 EXPORT_SYMBOL(out_create_pack);
244
245 int out_ref_del_pack(const struct lu_env *env, struct update_buffer *ubuf,
246                      const struct lu_fid *fid, __u64 batchid)
247 {
248         return out_update_pack(env, ubuf, OUT_REF_DEL, fid, 0, NULL, NULL,
249                                batchid);
250 }
251 EXPORT_SYMBOL(out_ref_del_pack);
252
253 int out_ref_add_pack(const struct lu_env *env, struct update_buffer *ubuf,
254                      const struct lu_fid *fid, __u64 batchid)
255 {
256         return out_update_pack(env, ubuf, OUT_REF_ADD, fid, 0, NULL, NULL,
257                                batchid);
258 }
259 EXPORT_SYMBOL(out_ref_add_pack);
260
261 int out_attr_set_pack(const struct lu_env *env, struct update_buffer *ubuf,
262                       const struct lu_fid *fid, const struct lu_attr *attr,
263                       __u64 batchid)
264 {
265         struct object_update    *update;
266         struct obdo             *obdo;
267         __u16                   size = sizeof(*obdo);
268         ENTRY;
269
270         update = out_update_header_pack(env, ubuf, OUT_ATTR_SET, fid, 1,
271                                         &size, batchid);
272         if (IS_ERR(update))
273                 RETURN(PTR_ERR(update));
274
275         obdo = object_update_param_get(update, 0, NULL);
276         obdo->o_valid = 0;
277         obdo_from_la(obdo, attr, attr->la_valid);
278         lustre_set_wire_obdo(NULL, obdo, obdo);
279
280         RETURN(0);
281 }
282 EXPORT_SYMBOL(out_attr_set_pack);
283
284 int out_xattr_set_pack(const struct lu_env *env, struct update_buffer *ubuf,
285                        const struct lu_fid *fid, const struct lu_buf *buf,
286                        const char *name, int flag, __u64 batchid)
287 {
288         __u16   sizes[3] = {strlen(name) + 1, buf->lb_len, sizeof(flag)};
289         const void *bufs[3] = {(char *)name, (char *)buf->lb_buf,
290                                (char *)&flag};
291
292         return out_update_pack(env, ubuf, OUT_XATTR_SET, fid,
293                                ARRAY_SIZE(sizes), sizes, bufs, batchid);
294 }
295 EXPORT_SYMBOL(out_xattr_set_pack);
296
297 int out_xattr_del_pack(const struct lu_env *env, struct update_buffer *ubuf,
298                        const struct lu_fid *fid, const char *name,
299                        __u64 batchid)
300 {
301         __u16   size = strlen(name) + 1;
302
303         return out_update_pack(env, ubuf, OUT_XATTR_DEL, fid, 1, &size,
304                                (const void **)&name, batchid);
305 }
306 EXPORT_SYMBOL(out_xattr_del_pack);
307
308
309 int out_index_insert_pack(const struct lu_env *env, struct update_buffer *ubuf,
310                           const struct lu_fid *fid, const struct dt_rec *rec,
311                           const struct dt_key *key, __u64 batchid)
312 {
313         struct dt_insert_rec       *rec1 = (struct dt_insert_rec *)rec;
314         struct lu_fid              rec_fid;
315         __u32                       type = cpu_to_le32(rec1->rec_type);
316         __u16                       sizes[3] = { strlen((char *)key) + 1,
317                                                 sizeof(rec_fid),
318                                                 sizeof(type) };
319         const void                 *bufs[3] = { (char *)key,
320                                                 (char *)&rec_fid,
321                                                 (char *)&type };
322
323         fid_cpu_to_le(&rec_fid, rec1->rec_fid);
324
325         return out_update_pack(env, ubuf, OUT_INDEX_INSERT, fid,
326                                ARRAY_SIZE(sizes), sizes, bufs, batchid);
327 }
328 EXPORT_SYMBOL(out_index_insert_pack);
329
330 int out_index_delete_pack(const struct lu_env *env, struct update_buffer *ubuf,
331                           const struct lu_fid *fid, const struct dt_key *key,
332                           __u64 batchid)
333 {
334         __u16   size = strlen((char *)key) + 1;
335         const void *buf = key;
336
337         return out_update_pack(env, ubuf, OUT_INDEX_DELETE, fid, 1, &size,
338                                &buf, batchid);
339 }
340 EXPORT_SYMBOL(out_index_delete_pack);
341
342 int out_object_destroy_pack(const struct lu_env *env,
343                             struct update_buffer *ubuf,
344                             const struct lu_fid *fid, __u64 batchid)
345 {
346         return out_update_pack(env, ubuf, OUT_DESTROY, fid, 0, NULL, NULL,
347                                batchid);
348 }
349 EXPORT_SYMBOL(out_object_destroy_pack);
350
351 int out_write_pack(const struct lu_env *env, struct update_buffer *ubuf,
352                    const struct lu_fid *fid, const struct lu_buf *buf,
353                    loff_t pos, __u64 batchid)
354 {
355         __u16           sizes[2] = {buf->lb_len, sizeof(pos)};
356         const void      *bufs[2] = {(char *)buf->lb_buf, (char *)&pos};
357         int             rc;
358
359         pos = cpu_to_le64(pos);
360
361         rc = out_update_pack(env, ubuf, OUT_WRITE, fid, ARRAY_SIZE(sizes),
362                              sizes, bufs, batchid);
363         return rc;
364 }
365 EXPORT_SYMBOL(out_write_pack);
366
367 /**
368  * Pack various readonly updates into the update_buffer.
369  *
370  * The following update funcs are only used by read-only ops, lookup,
371  * getattr etc, so it does not need transaction here. Currently they
372  * are only used by OSP.
373  *
374  * \param[in] env       execution environment
375  * \param[in] fid       fid of this object for the update
376  * \param[in] ubuf      update buffer
377  *
378  * \retval              0 if packing succeeds.
379  * \retval              negative errno if packing fails.
380  */
381 int out_index_lookup_pack(const struct lu_env *env, struct update_buffer *ubuf,
382                           const struct lu_fid *fid, struct dt_rec *rec,
383                           const struct dt_key *key)
384 {
385         const void      *name = key;
386         __u16           size = strlen((char *)name) + 1;
387
388         return out_update_pack(env, ubuf, OUT_INDEX_LOOKUP, fid, 1, &size,
389                                &name, 0);
390 }
391 EXPORT_SYMBOL(out_index_lookup_pack);
392
393 int out_attr_get_pack(const struct lu_env *env, struct update_buffer *ubuf,
394                       const struct lu_fid *fid)
395 {
396         return out_update_pack(env, ubuf, OUT_ATTR_GET, fid, 0, NULL, NULL, 0);
397 }
398 EXPORT_SYMBOL(out_attr_get_pack);
399
400 int out_xattr_get_pack(const struct lu_env *env, struct update_buffer *ubuf,
401                        const struct lu_fid *fid, const char *name)
402 {
403         __u16 size;
404
405         LASSERT(name != NULL);
406         size = strlen(name) + 1;
407         return out_update_pack(env, ubuf, OUT_XATTR_GET, fid, 1, &size,
408                                (const void **)&name, 0);
409 }
410 EXPORT_SYMBOL(out_xattr_get_pack);