Whamcloud - gitweb
8db03c1fbbf1f3cf4557b012707beb6e6284a5b7
[fs/lustre-release.git] / lustre / target / out_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, Intel Corporation.
24  */
25 /*
26  * lustre/target/out_lib.c
27  *
28  * Author: Di Wang <di.wang@intel.com>
29  * Author: Fan, Yong <fan.yong@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_CLASS
33
34 #include <lu_target.h>
35 #include <lustre_update.h>
36 #include <obd.h>
37 #include <obd_class.h>
38
39 #define OUT_UPDATE_BUFFER_SIZE_ADD      4096
40 #define OUT_UPDATE_BUFFER_SIZE_MAX      (256 * 4096)  /* 1MB update size now */
41
42 const char *update_op_str(__u16 opc)
43 {
44         static const char *opc_str[] = {
45                 [OUT_START] = "start",
46                 [OUT_CREATE] = "create",
47                 [OUT_DESTROY] = "destroy",
48                 [OUT_REF_ADD] = "ref_add",
49                 [OUT_REF_DEL] = "ref_del" ,
50                 [OUT_ATTR_SET] = "attr_set",
51                 [OUT_ATTR_GET] = "attr_get",
52                 [OUT_XATTR_SET] = "xattr_set",
53                 [OUT_XATTR_GET] = "xattr_get",
54                 [OUT_INDEX_LOOKUP] = "lookup",
55                 [OUT_INDEX_INSERT] = "insert",
56                 [OUT_INDEX_DELETE] = "delete",
57                 [OUT_WRITE] = "write",
58                 [OUT_XATTR_DEL] = "xattr_del",
59         };
60
61         if (opc < ARRAY_SIZE(opc_str) && opc_str[opc] != NULL)
62                 return opc_str[opc];
63         else
64                 return "unknown";
65 }
66 EXPORT_SYMBOL(update_op_str);
67
68 /**
69  * Fill object update header
70  *
71  * Only fill the object update header, and parameters will be filled later
72  * in other functions.
73  *
74  * \params[in] env              execution environment
75  * \params[in] update           object update to be filled
76  * \params[in] max_update_size  maximum object update size, if the current
77  *                              update length equals or exceeds the size, it
78  *                              will return -E2BIG.
79  * \params[in] update_op        update type
80  * \params[in] fid              object FID of the update
81  * \params[in] params_count     the count of the update parameters
82  * \params[in] params_sizes     the length of each parameters
83  *
84  * \retval                      0 if packing succeeds.
85  * \retval                      -E2BIG if packing exceeds the maximum length.
86  */
87 int out_update_header_pack(const struct lu_env *env,
88                            struct object_update *update, size_t max_update_size,
89                            enum update_type update_op, const struct lu_fid *fid,
90                            unsigned int param_count, __u16 *params_sizes)
91 {
92         struct object_update_param      *param;
93         unsigned int                    i;
94         size_t                          update_size;
95
96         /* Check whether the packing exceeding the maxima update length */
97         update_size = sizeof(*update);
98         for (i = 0; i < param_count; i++)
99                 update_size += cfs_size_round(sizeof(*param) + params_sizes[i]);
100
101         if (unlikely(update_size >= max_update_size))
102                 return -E2BIG;
103
104         update->ou_fid = *fid;
105         update->ou_type = update_op;
106         update->ou_params_count = param_count;
107         param = &update->ou_params[0];
108         for (i = 0; i < param_count; i++) {
109                 param->oup_len = params_sizes[i];
110                 param = (struct object_update_param *)((char *)param +
111                          object_update_param_size(param));
112         }
113
114         return 0;
115 }
116
117 /**
118  * Packs one update into the update_buffer.
119  *
120  * \param[in] env       execution environment
121  * \param[in] update    update to be packed
122  * \param[in] max_update_size   *maximum size of \a update
123  * \param[in] op        update operation (enum update_type)
124  * \param[in] fid       object FID for this update
125  * \param[in] param_count       number of parameters for this update
126  * \param[in] param_sizes       array of parameters length of this update
127  * \param[in] param_bufs        parameter buffers
128  *
129  * \retval              = 0 if updates packing succeeds
130  * \retval              negative errno if updates packing fails
131  **/
132 int out_update_pack(const struct lu_env *env, struct object_update *update,
133                     size_t max_update_size, enum update_type op,
134                     const struct lu_fid *fid, unsigned int param_count,
135                     __u16 *param_sizes, const void **param_bufs)
136 {
137         struct object_update_param      *param;
138         unsigned int                    i;
139         int                             rc;
140         ENTRY;
141
142         rc = out_update_header_pack(env, update, max_update_size, op, fid,
143                                     param_count, param_sizes);
144         if (rc != 0)
145                 RETURN(rc);
146
147         param = &update->ou_params[0];
148         for (i = 0; i < param_count; i++) {
149                 memcpy(&param->oup_buf[0], param_bufs[i], param_sizes[i]);
150                 param = (struct object_update_param *)((char *)param +
151                          object_update_param_size(param));
152         }
153
154         RETURN(0);
155 }
156 EXPORT_SYMBOL(out_update_pack);
157
158 /**
159  * Pack various updates into the update_buffer.
160  *
161  * The following functions pack different updates into the update_buffer
162  * So parameters of these API is basically same as its correspondent OSD/OSP
163  * API, for detail description of these parameters see osd_handler.c or
164  * osp_md_object.c.
165  *
166  * \param[in] env       execution environment
167  * \param[in] ubuf      update buffer
168  * \param[in] fid       fid of this object for the update
169  *
170  * \retval              0 if insertion succeeds.
171  * \retval              negative errno if insertion fails.
172  */
173 int out_create_pack(const struct lu_env *env, struct object_update *update,
174                     size_t max_update_size, const struct lu_fid *fid,
175                     const struct lu_attr *attr, struct dt_allocation_hint *hint,
176                     struct dt_object_format *dof)
177 {
178         struct obdo             *obdo;
179         __u16                   sizes[2] = {sizeof(*obdo), 0};
180         int                     buf_count = 1;
181         const struct lu_fid     *parent_fid = NULL;
182         int                     rc;
183         ENTRY;
184
185         if (hint != NULL && hint->dah_parent) {
186                 parent_fid = lu_object_fid(&hint->dah_parent->do_lu);
187                 sizes[1] = sizeof(*parent_fid);
188                 buf_count++;
189         }
190
191         rc = out_update_header_pack(env, update, max_update_size, OUT_CREATE,
192                                     fid, buf_count, sizes);
193         if (rc != 0)
194                 RETURN(rc);
195
196         obdo = object_update_param_get(update, 0, NULL);
197         LASSERT(obdo != NULL);
198         obdo->o_valid = 0;
199         obdo_from_la(obdo, attr, attr->la_valid);
200         lustre_set_wire_obdo(NULL, obdo, obdo);
201
202         if (parent_fid != NULL) {
203                 struct lu_fid *tmp;
204
205                 tmp = object_update_param_get(update, 1, NULL);
206                 LASSERT(tmp != NULL);
207                 fid_cpu_to_le(tmp, parent_fid);
208         }
209
210         RETURN(0);
211 }
212 EXPORT_SYMBOL(out_create_pack);
213
214 int out_ref_del_pack(const struct lu_env *env, struct object_update *update,
215                      size_t max_update_size, const struct lu_fid *fid)
216 {
217         return out_update_pack(env, update, max_update_size, OUT_REF_DEL, fid,
218                                0, NULL, NULL);
219 }
220 EXPORT_SYMBOL(out_ref_del_pack);
221
222 int out_ref_add_pack(const struct lu_env *env, struct object_update *update,
223                      size_t max_update_size, const struct lu_fid *fid)
224 {
225         return out_update_pack(env, update, max_update_size, OUT_REF_ADD, fid,
226                                0, NULL, NULL);
227 }
228 EXPORT_SYMBOL(out_ref_add_pack);
229
230 int out_attr_set_pack(const struct lu_env *env, struct object_update *update,
231                       size_t max_update_size, const struct lu_fid *fid,
232                       const struct lu_attr *attr)
233 {
234         struct obdo             *obdo;
235         __u16                   size = sizeof(*obdo);
236         int                     rc;
237         ENTRY;
238
239         rc = out_update_header_pack(env, update, max_update_size,
240                                     OUT_ATTR_SET, fid, 1, &size);
241         if (rc != 0)
242                 RETURN(rc);
243
244         obdo = object_update_param_get(update, 0, NULL);
245         LASSERT(obdo != NULL);
246         obdo->o_valid = 0;
247         obdo_from_la(obdo, attr, attr->la_valid);
248         lustre_set_wire_obdo(NULL, obdo, obdo);
249
250         RETURN(0);
251 }
252 EXPORT_SYMBOL(out_attr_set_pack);
253
254 int out_xattr_set_pack(const struct lu_env *env, struct object_update *update,
255                        size_t max_update_size, const struct lu_fid *fid,
256                        const struct lu_buf *buf, const char *name, __u32 flag)
257 {
258         __u16   sizes[3] = {strlen(name) + 1, buf->lb_len, sizeof(flag)};
259         const void *bufs[3] = {(char *)name, (char *)buf->lb_buf,
260                                (char *)&flag};
261
262         return out_update_pack(env, update, max_update_size, OUT_XATTR_SET,
263                                fid, ARRAY_SIZE(sizes), sizes, bufs);
264 }
265 EXPORT_SYMBOL(out_xattr_set_pack);
266
267 int out_xattr_del_pack(const struct lu_env *env, struct object_update *update,
268                        size_t max_update_size, const struct lu_fid *fid,
269                        const char *name)
270 {
271         __u16   size = strlen(name) + 1;
272
273         return out_update_pack(env, update, max_update_size, OUT_XATTR_DEL,
274                                fid, 1, &size, (const void **)&name);
275 }
276 EXPORT_SYMBOL(out_xattr_del_pack);
277
278
279 int out_index_insert_pack(const struct lu_env *env,
280                           struct object_update *update,
281                           size_t max_update_size, const struct lu_fid *fid,
282                           const struct dt_rec *rec, const struct dt_key *key)
283 {
284         struct dt_insert_rec       *rec1 = (struct dt_insert_rec *)rec;
285         struct lu_fid              rec_fid;
286         __u32                       type = cpu_to_le32(rec1->rec_type);
287         __u16                       sizes[3] = { strlen((char *)key) + 1,
288                                                 sizeof(rec_fid),
289                                                 sizeof(type) };
290         const void                 *bufs[3] = { (char *)key,
291                                                 (char *)&rec_fid,
292                                                 (char *)&type };
293
294         fid_cpu_to_le(&rec_fid, rec1->rec_fid);
295
296         return out_update_pack(env, update, max_update_size, OUT_INDEX_INSERT,
297                                fid, ARRAY_SIZE(sizes), sizes, bufs);
298 }
299 EXPORT_SYMBOL(out_index_insert_pack);
300
301 int out_index_delete_pack(const struct lu_env *env,
302                           struct object_update *update,
303                           size_t max_update_size, const struct lu_fid *fid,
304                           const struct dt_key *key)
305 {
306         __u16   size = strlen((char *)key) + 1;
307         const void *buf = key;
308
309         return out_update_pack(env, update, max_update_size, OUT_INDEX_DELETE,
310                                fid, 1, &size, &buf);
311 }
312 EXPORT_SYMBOL(out_index_delete_pack);
313
314 int out_object_destroy_pack(const struct lu_env *env,
315                             struct object_update *update,
316                             size_t max_update_size, const struct lu_fid *fid)
317 {
318         return out_update_pack(env, update, max_update_size, OUT_DESTROY, fid,
319                                0, NULL, NULL);
320 }
321 EXPORT_SYMBOL(out_object_destroy_pack);
322
323 int out_write_pack(const struct lu_env *env, struct object_update *update,
324                    size_t max_update_size, const struct lu_fid *fid,
325                    const struct lu_buf *buf, __u64 pos)
326 {
327         __u16           sizes[2] = {buf->lb_len, sizeof(pos)};
328         const void      *bufs[2] = {(char *)buf->lb_buf, (char *)&pos};
329         int             rc;
330
331         pos = cpu_to_le64(pos);
332
333         rc = out_update_pack(env, update, max_update_size, OUT_WRITE, fid,
334                              ARRAY_SIZE(sizes), sizes, bufs);
335         return rc;
336 }
337 EXPORT_SYMBOL(out_write_pack);
338
339 /**
340  * Pack various readonly updates into the update_buffer.
341  *
342  * The following update funcs are only used by read-only ops, lookup,
343  * getattr etc, so it does not need transaction here. Currently they
344  * are only used by OSP.
345  *
346  * \param[in] env       execution environment
347  * \param[in] fid       fid of this object for the update
348  * \param[in] ubuf      update buffer
349  *
350  * \retval              = 0 pack succeed.
351  *                      < 0 pack failed.
352  **/
353 int out_index_lookup_pack(const struct lu_env *env,
354                           struct object_update *update,
355                           size_t max_update_size, const struct lu_fid *fid,
356                           struct dt_rec *rec, const struct dt_key *key)
357 {
358         const void      *name = key;
359         __u16           size = strlen((char *)name) + 1;
360
361         return out_update_pack(env, update, max_update_size, OUT_INDEX_LOOKUP,
362                                fid, 1, &size, &name);
363 }
364 EXPORT_SYMBOL(out_index_lookup_pack);
365
366 int out_attr_get_pack(const struct lu_env *env, struct object_update *update,
367                       size_t max_update_size, const struct lu_fid *fid)
368 {
369         return out_update_pack(env, update, max_update_size, OUT_ATTR_GET,
370                                fid, 0, NULL, NULL);
371 }
372 EXPORT_SYMBOL(out_attr_get_pack);
373
374 int out_xattr_get_pack(const struct lu_env *env, struct object_update *update,
375                        size_t max_update_size, const struct lu_fid *fid,
376                        const char *name)
377 {
378         __u16 size;
379
380         LASSERT(name != NULL);
381         size = strlen(name) + 1;
382
383         return out_update_pack(env, update, max_update_size, OUT_XATTR_GET,
384                                fid, 1, &size, (const void **)&name);
385 }
386 EXPORT_SYMBOL(out_xattr_get_pack);