Whamcloud - gitweb
LU-6473 mdt: enqueue lookup lock on the parent MDT
[fs/lustre-release.git] / lustre / target / out_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, Intel Corporation.
24  */
25 /*
26  * lustre/target/out_lib.c
27  *
28  * Author: Di Wang <di.wang@intel.com>
29  * Author: Fan, Yong <fan.yong@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_CLASS
33
34 #include <lu_target.h>
35 #include <lustre_update.h>
36 #include <obd.h>
37 #include <obd_class.h>
38
39 #define OUT_UPDATE_BUFFER_SIZE_ADD      4096
40 #define OUT_UPDATE_BUFFER_SIZE_MAX      (256 * 4096)  /* 1MB update size now */
41
42 const char *update_op_str(__u16 opc)
43 {
44         static const char *opc_str[] = {
45                 [OUT_START] = "start",
46                 [OUT_CREATE] = "create",
47                 [OUT_DESTROY] = "destroy",
48                 [OUT_REF_ADD] = "ref_add",
49                 [OUT_REF_DEL] = "ref_del" ,
50                 [OUT_ATTR_SET] = "attr_set",
51                 [OUT_ATTR_GET] = "attr_get",
52                 [OUT_XATTR_SET] = "xattr_set",
53                 [OUT_XATTR_GET] = "xattr_get",
54                 [OUT_INDEX_LOOKUP] = "lookup",
55                 [OUT_INDEX_INSERT] = "insert",
56                 [OUT_INDEX_DELETE] = "delete",
57                 [OUT_WRITE] = "write",
58                 [OUT_XATTR_DEL] = "xattr_del",
59                 [OUT_PUNCH] = "punch",
60                 [OUT_READ] = "read",
61         };
62
63         if (opc < ARRAY_SIZE(opc_str) && opc_str[opc] != NULL)
64                 return opc_str[opc];
65         else
66                 return "unknown";
67 }
68 EXPORT_SYMBOL(update_op_str);
69
70 /**
71  * Fill object update header
72  *
73  * Only fill the object update header, and parameters will be filled later
74  * in other functions.
75  *
76  * \params[in] env              execution environment
77  * \params[in] update           object update to be filled
78  * \params[in] max_update_size  maximum object update size, if the current
79  *                              update length equals or exceeds the size, it
80  *                              will return -E2BIG.
81  * \params[in] update_op        update type
82  * \params[in] fid              object FID of the update
83  * \params[in] params_count     the count of the update parameters
84  * \params[in] params_sizes     the length of each parameters
85  *
86  * \retval                      0 if packing succeeds.
87  * \retval                      -E2BIG if packing exceeds the maximum length.
88  */
89 int out_update_header_pack(const struct lu_env *env,
90                            struct object_update *update, size_t max_update_size,
91                            enum update_type update_op, const struct lu_fid *fid,
92                            unsigned int param_count, __u16 *params_sizes)
93 {
94         struct object_update_param      *param;
95         unsigned int                    i;
96         size_t                          update_size;
97
98         /* Check whether the packing exceeding the maxima update length */
99         update_size = sizeof(*update);
100         for (i = 0; i < param_count; i++)
101                 update_size += cfs_size_round(sizeof(*param) + params_sizes[i]);
102
103         if (unlikely(update_size >= max_update_size))
104                 return -E2BIG;
105
106         update->ou_fid = *fid;
107         update->ou_type = update_op;
108         update->ou_params_count = param_count;
109         param = &update->ou_params[0];
110         for (i = 0; i < param_count; i++) {
111                 param->oup_len = params_sizes[i];
112                 param = (struct object_update_param *)((char *)param +
113                          object_update_param_size(param));
114         }
115
116         return 0;
117 }
118
119 /**
120  * Packs one update into the update_buffer.
121  *
122  * \param[in] env       execution environment
123  * \param[in] update    update to be packed
124  * \param[in] max_update_size   *maximum size of \a update
125  * \param[in] op        update operation (enum update_type)
126  * \param[in] fid       object FID for this update
127  * \param[in] param_count       number of parameters for this update
128  * \param[in] param_sizes       array of parameters length of this update
129  * \param[in] param_bufs        parameter buffers
130  *
131  * \retval              = 0 if updates packing succeeds
132  * \retval              negative errno if updates packing fails
133  **/
134 int out_update_pack(const struct lu_env *env, struct object_update *update,
135                     size_t max_update_size, enum update_type op,
136                     const struct lu_fid *fid, unsigned int param_count,
137                     __u16 *param_sizes, const void **param_bufs)
138 {
139         struct object_update_param      *param;
140         unsigned int                    i;
141         int                             rc;
142         ENTRY;
143
144         rc = out_update_header_pack(env, update, max_update_size, op, fid,
145                                     param_count, param_sizes);
146         if (rc != 0)
147                 RETURN(rc);
148
149         param = &update->ou_params[0];
150         for (i = 0; i < param_count; i++) {
151                 memcpy(&param->oup_buf[0], param_bufs[i], param_sizes[i]);
152                 param = (struct object_update_param *)((char *)param +
153                          object_update_param_size(param));
154         }
155
156         RETURN(0);
157 }
158 EXPORT_SYMBOL(out_update_pack);
159
160 /**
161  * Pack various updates into the update_buffer.
162  *
163  * The following functions pack different updates into the update_buffer
164  * So parameters of these API is basically same as its correspondent OSD/OSP
165  * API, for detail description of these parameters see osd_handler.c or
166  * osp_md_object.c.
167  *
168  * \param[in] env       execution environment
169  * \param[in] ubuf      update buffer
170  * \param[in] fid       fid of this object for the update
171  *
172  * \retval              0 if insertion succeeds.
173  * \retval              negative errno if insertion fails.
174  */
175 int out_create_pack(const struct lu_env *env, struct object_update *update,
176                     size_t max_update_size, const struct lu_fid *fid,
177                     const struct lu_attr *attr, struct dt_allocation_hint *hint,
178                     struct dt_object_format *dof)
179 {
180         struct obdo             *obdo;
181         __u16                   sizes[2] = {sizeof(*obdo), 0};
182         int                     buf_count = 1;
183         const struct lu_fid     *parent_fid = NULL;
184         int                     rc;
185         ENTRY;
186
187         if (hint != NULL && hint->dah_parent) {
188                 parent_fid = lu_object_fid(&hint->dah_parent->do_lu);
189                 sizes[1] = sizeof(*parent_fid);
190                 buf_count++;
191         }
192
193         rc = out_update_header_pack(env, update, max_update_size, OUT_CREATE,
194                                     fid, buf_count, sizes);
195         if (rc != 0)
196                 RETURN(rc);
197
198         obdo = object_update_param_get(update, 0, NULL);
199         LASSERT(obdo != NULL);
200         obdo->o_valid = 0;
201         obdo_from_la(obdo, attr, attr->la_valid);
202         lustre_set_wire_obdo(NULL, obdo, obdo);
203
204         if (parent_fid != NULL) {
205                 struct lu_fid *tmp;
206
207                 tmp = object_update_param_get(update, 1, NULL);
208                 LASSERT(tmp != NULL);
209                 fid_cpu_to_le(tmp, parent_fid);
210         }
211
212         RETURN(0);
213 }
214 EXPORT_SYMBOL(out_create_pack);
215
216 int out_ref_del_pack(const struct lu_env *env, struct object_update *update,
217                      size_t max_update_size, const struct lu_fid *fid)
218 {
219         return out_update_pack(env, update, max_update_size, OUT_REF_DEL, fid,
220                                0, NULL, NULL);
221 }
222 EXPORT_SYMBOL(out_ref_del_pack);
223
224 int out_ref_add_pack(const struct lu_env *env, struct object_update *update,
225                      size_t max_update_size, const struct lu_fid *fid)
226 {
227         return out_update_pack(env, update, max_update_size, OUT_REF_ADD, fid,
228                                0, NULL, NULL);
229 }
230 EXPORT_SYMBOL(out_ref_add_pack);
231
232 int out_attr_set_pack(const struct lu_env *env, struct object_update *update,
233                       size_t max_update_size, const struct lu_fid *fid,
234                       const struct lu_attr *attr)
235 {
236         struct obdo             *obdo;
237         __u16                   size = sizeof(*obdo);
238         int                     rc;
239         ENTRY;
240
241         rc = out_update_header_pack(env, update, max_update_size,
242                                     OUT_ATTR_SET, fid, 1, &size);
243         if (rc != 0)
244                 RETURN(rc);
245
246         obdo = object_update_param_get(update, 0, NULL);
247         LASSERT(obdo != NULL);
248         obdo->o_valid = 0;
249         obdo_from_la(obdo, attr, attr->la_valid);
250         lustre_set_wire_obdo(NULL, obdo, obdo);
251
252         RETURN(0);
253 }
254 EXPORT_SYMBOL(out_attr_set_pack);
255
256 int out_xattr_set_pack(const struct lu_env *env, struct object_update *update,
257                        size_t max_update_size, const struct lu_fid *fid,
258                        const struct lu_buf *buf, const char *name, __u32 flag)
259 {
260         __u16   sizes[3] = {strlen(name) + 1, buf->lb_len, sizeof(flag)};
261         const void *bufs[3] = {(char *)name, (char *)buf->lb_buf,
262                                (char *)&flag};
263
264         return out_update_pack(env, update, max_update_size, OUT_XATTR_SET,
265                                fid, ARRAY_SIZE(sizes), sizes, bufs);
266 }
267 EXPORT_SYMBOL(out_xattr_set_pack);
268
269 int out_xattr_del_pack(const struct lu_env *env, struct object_update *update,
270                        size_t max_update_size, const struct lu_fid *fid,
271                        const char *name)
272 {
273         __u16   size = strlen(name) + 1;
274
275         return out_update_pack(env, update, max_update_size, OUT_XATTR_DEL,
276                                fid, 1, &size, (const void **)&name);
277 }
278 EXPORT_SYMBOL(out_xattr_del_pack);
279
280
281 int out_index_insert_pack(const struct lu_env *env,
282                           struct object_update *update,
283                           size_t max_update_size, const struct lu_fid *fid,
284                           const struct dt_rec *rec, const struct dt_key *key)
285 {
286         struct dt_insert_rec       *rec1 = (struct dt_insert_rec *)rec;
287         struct lu_fid              rec_fid;
288         __u32                       type = cpu_to_le32(rec1->rec_type);
289         __u16                       sizes[3] = { strlen((char *)key) + 1,
290                                                 sizeof(rec_fid),
291                                                 sizeof(type) };
292         const void                 *bufs[3] = { (char *)key,
293                                                 (char *)&rec_fid,
294                                                 (char *)&type };
295
296         fid_cpu_to_le(&rec_fid, rec1->rec_fid);
297
298         return out_update_pack(env, update, max_update_size, OUT_INDEX_INSERT,
299                                fid, ARRAY_SIZE(sizes), sizes, bufs);
300 }
301 EXPORT_SYMBOL(out_index_insert_pack);
302
303 int out_index_delete_pack(const struct lu_env *env,
304                           struct object_update *update,
305                           size_t max_update_size, const struct lu_fid *fid,
306                           const struct dt_key *key)
307 {
308         __u16   size = strlen((char *)key) + 1;
309         const void *buf = key;
310
311         return out_update_pack(env, update, max_update_size, OUT_INDEX_DELETE,
312                                fid, 1, &size, &buf);
313 }
314 EXPORT_SYMBOL(out_index_delete_pack);
315
316 int out_object_destroy_pack(const struct lu_env *env,
317                             struct object_update *update,
318                             size_t max_update_size, const struct lu_fid *fid)
319 {
320         return out_update_pack(env, update, max_update_size, OUT_DESTROY, fid,
321                                0, NULL, NULL);
322 }
323 EXPORT_SYMBOL(out_object_destroy_pack);
324
325 int out_write_pack(const struct lu_env *env, struct object_update *update,
326                    size_t max_update_size, const struct lu_fid *fid,
327                    const struct lu_buf *buf, __u64 pos)
328 {
329         __u16           sizes[2] = {buf->lb_len, sizeof(pos)};
330         const void      *bufs[2] = {(char *)buf->lb_buf, (char *)&pos};
331         int             rc;
332
333         pos = cpu_to_le64(pos);
334
335         rc = out_update_pack(env, update, max_update_size, OUT_WRITE, fid,
336                              ARRAY_SIZE(sizes), sizes, bufs);
337         return rc;
338 }
339 EXPORT_SYMBOL(out_write_pack);
340
341 /**
342  * Pack various readonly updates into the update_buffer.
343  *
344  * The following update funcs are only used by read-only ops, lookup,
345  * getattr etc, so it does not need transaction here. Currently they
346  * are only used by OSP.
347  *
348  * \param[in] env       execution environment
349  * \param[in] fid       fid of this object for the update
350  * \param[in] ubuf      update buffer
351  *
352  * \retval              = 0 pack succeed.
353  *                      < 0 pack failed.
354  **/
355 int out_index_lookup_pack(const struct lu_env *env,
356                           struct object_update *update,
357                           size_t max_update_size, const struct lu_fid *fid,
358                           struct dt_rec *rec, const struct dt_key *key)
359 {
360         const void      *name = key;
361         __u16           size = strlen((char *)name) + 1;
362
363         return out_update_pack(env, update, max_update_size, OUT_INDEX_LOOKUP,
364                                fid, 1, &size, &name);
365 }
366 EXPORT_SYMBOL(out_index_lookup_pack);
367
368 int out_attr_get_pack(const struct lu_env *env, struct object_update *update,
369                       size_t max_update_size, const struct lu_fid *fid)
370 {
371         return out_update_pack(env, update, max_update_size, OUT_ATTR_GET,
372                                fid, 0, NULL, NULL);
373 }
374 EXPORT_SYMBOL(out_attr_get_pack);
375
376 int out_xattr_get_pack(const struct lu_env *env, struct object_update *update,
377                        size_t max_update_size, const struct lu_fid *fid,
378                        const char *name)
379 {
380         __u16 size;
381
382         LASSERT(name != NULL);
383         size = strlen(name) + 1;
384
385         return out_update_pack(env, update, max_update_size, OUT_XATTR_GET,
386                                fid, 1, &size, (const void **)&name);
387 }
388 EXPORT_SYMBOL(out_xattr_get_pack);
389
390 int out_read_pack(const struct lu_env *env, struct object_update *update,
391                   size_t max_update_length, const struct lu_fid *fid,
392                   size_t size, loff_t pos)
393 {
394         __u16           sizes[2] = {sizeof(size), sizeof(pos)};
395         const void      *bufs[2] = {&size, &pos};
396
397         size = cpu_to_le64(size);
398         pos = cpu_to_le64(pos);
399
400         return out_update_pack(env, update, max_update_length, OUT_READ, fid,
401                                ARRAY_SIZE(sizes), sizes, bufs);
402 }
403 EXPORT_SYMBOL(out_read_pack);