Whamcloud - gitweb
dce9da9139deca528f3d0097b6c14473feaa5f0e
[fs/lustre-release.git] / lustre / target / out_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, Intel Corporation.
24  */
25 /*
26  * lustre/target/out_lib.c
27  *
28  * Author: Di Wang <di.wang@intel.com>
29  * Author: Fan, Yong <fan.yong@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_CLASS
33
34 #include <lu_target.h>
35 #include <lustre_update.h>
36 #include <obd.h>
37 #include <obd_class.h>
38
39 #define OUT_UPDATE_BUFFER_SIZE_ADD      4096
40 #define OUT_UPDATE_BUFFER_SIZE_MAX      (256 * 4096)  /* 1MB update size now */
41
42 struct dt_update_request*
43 out_find_update(struct thandle_update *tu, struct dt_device *dt_dev)
44 {
45         struct dt_update_request   *dt_update;
46
47         list_for_each_entry(dt_update, &tu->tu_remote_update_list,
48                             dur_list) {
49                 if (dt_update->dur_dt == dt_dev)
50                         return dt_update;
51         }
52         return NULL;
53 }
54 EXPORT_SYMBOL(out_find_update);
55
56 static struct object_update_request *object_update_request_alloc(size_t size)
57 {
58         struct object_update_request *ourq;
59
60         OBD_ALLOC_LARGE(ourq, size);
61         if (ourq == NULL)
62                 RETURN(ERR_PTR(-ENOMEM));
63
64         ourq->ourq_magic = UPDATE_REQUEST_MAGIC;
65         ourq->ourq_count = 0;
66
67         RETURN(ourq);
68 }
69
70 static void object_update_request_free(struct object_update_request *ourq,
71                                        size_t ourq_size)
72 {
73         if (ourq != NULL)
74                 OBD_FREE_LARGE(ourq, ourq_size);
75 }
76
77 void dt_update_request_destroy(struct dt_update_request *dt_update)
78 {
79         if (dt_update == NULL)
80                 return;
81
82         list_del(&dt_update->dur_list);
83
84         object_update_request_free(dt_update->dur_buf.ub_req,
85                                    dt_update->dur_buf.ub_req_size);
86         OBD_FREE_PTR(dt_update);
87 }
88 EXPORT_SYMBOL(dt_update_request_destroy);
89
90 /**
91  * Allocate and initialize dt_update_request
92  *
93  * dt_update_request is being used to track updates being executed on
94  * this dt_device(OSD or OSP). The update buffer will be 8k initially,
95  * and increased if needed.
96  *
97  * \param [in] dt       dt device
98  *
99  * \retval              dt_update_request being allocated if succeed
100  * \retval              ERR_PTR(errno) if failed
101  */
102 struct dt_update_request *dt_update_request_create(struct dt_device *dt)
103 {
104         struct dt_update_request *dt_update;
105         struct object_update_request *ourq;
106
107         OBD_ALLOC_PTR(dt_update);
108         if (!dt_update)
109                 return ERR_PTR(-ENOMEM);
110
111         ourq = object_update_request_alloc(OUT_UPDATE_INIT_BUFFER_SIZE);
112         if (IS_ERR(ourq)) {
113                 OBD_FREE_PTR(dt_update);
114                 return ERR_CAST(ourq);
115         }
116
117         dt_update->dur_buf.ub_req = ourq;
118         dt_update->dur_buf.ub_req_size = OUT_UPDATE_INIT_BUFFER_SIZE;
119
120         INIT_LIST_HEAD(&dt_update->dur_list);
121         dt_update->dur_dt = dt;
122         dt_update->dur_batchid = 0;
123         INIT_LIST_HEAD(&dt_update->dur_cb_items);
124
125         return dt_update;
126 }
127 EXPORT_SYMBOL(dt_update_request_create);
128
129 /**
130  * Find or create dt_update_request.
131  *
132  * Find or create one loc in th_dev/dev_obj_update for the update,
133  * Because only one thread can access this thandle, no need
134  * lock now.
135  *
136  * \param[in] th        transaction handle
137  * \param[in] dt        lookup update request by dt_object
138  *
139  * \retval              pointer of dt_update_request if it can be created
140  *                      or found.
141  * \retval              ERR_PTR(errno) if it can not be created or found.
142  */
143 struct dt_update_request *
144 dt_update_request_find_or_create(struct thandle *th, struct dt_object *dt)
145 {
146         struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
147         struct thandle_update   *tu = th->th_update;
148         struct dt_update_request *update;
149         ENTRY;
150
151         if (tu == NULL) {
152                 OBD_ALLOC_PTR(tu);
153                 if (tu == NULL)
154                         RETURN(ERR_PTR(-ENOMEM));
155
156                 INIT_LIST_HEAD(&tu->tu_remote_update_list);
157                 tu->tu_sent_after_local_trans = 0;
158                 th->th_update = tu;
159         }
160
161         update = out_find_update(tu, dt_dev);
162         if (update != NULL)
163                 RETURN(update);
164
165         update = dt_update_request_create(dt_dev);
166         if (IS_ERR(update))
167                 RETURN(update);
168
169         list_add_tail(&update->dur_list, &tu->tu_remote_update_list);
170
171         if (!tu->tu_only_remote_trans)
172                 thandle_get(th);
173
174         RETURN(update);
175 }
176 EXPORT_SYMBOL(dt_update_request_find_or_create);
177
178 /**
179  * resize update buffer
180  *
181  * Extend the update buffer by new_size.
182  *
183  * \param[in] ubuf      update buffer to be extended
184  * \param[in] new_size  new size of the update buffer
185  *
186  * \retval              0 if extending succeeds.
187  * \retval              negative errno if extending fails.
188  */
189 static int update_buffer_resize(struct update_buffer *ubuf, size_t new_size)
190 {
191         struct object_update_request *ureq;
192
193         if (new_size > ubuf->ub_req_size)
194                 return 0;
195
196         OBD_ALLOC_LARGE(ureq, new_size);
197         if (ureq == NULL)
198                 return -ENOMEM;
199
200         memcpy(ureq, ubuf->ub_req, ubuf->ub_req_size);
201
202         OBD_FREE_LARGE(ubuf->ub_req, ubuf->ub_req_size);
203
204         ubuf->ub_req = ureq;
205         ubuf->ub_req_size = new_size;
206
207         return 0;
208 }
209
210 /**
211  * Pack the header of object_update_request
212  *
213  * Packs updates into the update_buffer header, which will either be sent to
214  * the remote MDT or stored in the local update log. The maximum update buffer
215  * size is 1MB for now.
216  *
217  * \param[in] env       execution environment
218  * \param[in] ubuf      update bufer which it will pack the update in
219  * \param[in] op        update operation
220  * \param[in] fid       object FID for this update
221  * \param[in] param_count       parameters count for this update
222  * \param[in] lens      each parameters length of this update
223  * \param[in] batchid   batchid(transaction no) of this update
224  *
225  * \retval              0 pack update succeed.
226  *                      negative errno pack update failed.
227  **/
228 static struct object_update*
229 out_update_header_pack(const struct lu_env *env, struct update_buffer *ubuf,
230                        enum update_type op, const struct lu_fid *fid,
231                        int params_count, __u16 *param_sizes, __u64 batchid)
232 {
233         struct object_update_request    *ureq = ubuf->ub_req;
234         size_t                          ureq_size = ubuf->ub_req_size;
235         struct object_update            *obj_update;
236         struct object_update_param      *param;
237         size_t                          update_size;
238         int                             rc = 0;
239         unsigned int                    i;
240         ENTRY;
241
242         /* Check update size to make sure it can fit into the buffer */
243         ureq_size = object_update_request_size(ureq);
244         update_size = offsetof(struct object_update, ou_params[0]);
245         for (i = 0; i < params_count; i++)
246                 update_size += cfs_size_round(param_sizes[i] + sizeof(*param));
247
248         if (unlikely(cfs_size_round(ureq_size + update_size) >
249                      ubuf->ub_req_size)) {
250                 size_t new_size = ubuf->ub_req_size;
251
252                 /* enlarge object update request size */
253                 while (new_size <
254                        cfs_size_round(ureq_size + update_size))
255                         new_size += OUT_UPDATE_BUFFER_SIZE_ADD;
256                 if (new_size >= OUT_UPDATE_BUFFER_SIZE_MAX)
257                         RETURN(ERR_PTR(-E2BIG));
258
259                 rc = update_buffer_resize(ubuf, new_size);
260                 if (rc < 0)
261                         RETURN(ERR_PTR(rc));
262
263                 ureq = ubuf->ub_req;
264         }
265
266         /* fill the update into the update buffer */
267         obj_update = (struct object_update *)((char *)ureq + ureq_size);
268         obj_update->ou_fid = *fid;
269         obj_update->ou_type = op;
270         obj_update->ou_params_count = (__u16)params_count;
271         obj_update->ou_batchid = batchid;
272         param = &obj_update->ou_params[0];
273         for (i = 0; i < params_count; i++) {
274                 param->oup_len = param_sizes[i];
275                 param = (struct object_update_param *)((char *)param +
276                          object_update_param_size(param));
277         }
278         ureq->ourq_count++;
279
280         CDEBUG(D_INFO, "%p "DFID" idx %u: op %d params %d:%d\n",
281                ureq, PFID(fid), ureq->ourq_count, op, params_count,
282                (int)update_size);
283
284         RETURN(obj_update);
285 }
286
287 /**
288  * Packs one update into the update_buffer.
289  *
290  * \param[in] env       execution environment
291  * \param[in] ubuf      bufer where update will be packed
292  * \param[in] op        update operation (enum update_type)
293  * \param[in] fid       object FID for this update
294  * \param[in] param_count       number of parameters for this update
295  * \param[in] param_sizes       array of parameters length of this update
296  * \param[in] param_bufs        parameter buffers
297  * \param[in] batchid   transaction no of this update, plus mdt_index, which
298  *                      will be globally unique
299  *
300  * \retval              = 0 if updates packing succeeds
301  * \retval              negative errno if updates packing fails
302  **/
303 int out_update_pack(const struct lu_env *env, struct update_buffer *ubuf,
304                     enum update_type op, const struct lu_fid *fid,
305                     int params_count, __u16 *param_sizes,
306                     const void **param_bufs, __u64 batchid)
307 {
308         struct object_update            *update;
309         struct object_update_param      *param;
310         unsigned int                    i;
311         ENTRY;
312
313         update = out_update_header_pack(env, ubuf, op, fid, params_count,
314                                         param_sizes, batchid);
315         if (IS_ERR(update))
316                 RETURN(PTR_ERR(update));
317
318         param = &update->ou_params[0];
319         for (i = 0; i < params_count; i++) {
320                 memcpy(&param->oup_buf[0], param_bufs[i], param_sizes[i]);
321                 param = (struct object_update_param *)((char *)param +
322                          object_update_param_size(param));
323         }
324
325         RETURN(0);
326 }
327 EXPORT_SYMBOL(out_update_pack);
328
329 /**
330  * Pack various updates into the update_buffer.
331  *
332  * The following functions pack different updates into the update_buffer
333  * So parameters of these API is basically same as its correspondent OSD/OSP
334  * API, for detail description of these parameters see osd_handler.c or
335  * osp_md_object.c.
336  *
337  * \param[in] env       execution environment
338  * \param[in] ubuf      update buffer
339  * \param[in] fid       fid of this object for the update
340  * \param[in] batchid   batch id of this update
341  *
342  * \retval              0 if insertion succeeds.
343  * \retval              negative errno if insertion fails.
344  */
345 int out_create_pack(const struct lu_env *env, struct update_buffer *ubuf,
346                     const struct lu_fid *fid, struct lu_attr *attr,
347                     struct dt_allocation_hint *hint,
348                     struct dt_object_format *dof, __u64 batchid)
349 {
350         struct obdo             *obdo;
351         __u16                   sizes[2] = {sizeof(*obdo), 0};
352         int                     buf_count = 1;
353         const struct lu_fid     *fid1 = NULL;
354         struct object_update    *update;
355         ENTRY;
356
357         if (hint != NULL && hint->dah_parent) {
358                 fid1 = lu_object_fid(&hint->dah_parent->do_lu);
359                 sizes[1] = sizeof(*fid1);
360                 buf_count++;
361         }
362
363         update = out_update_header_pack(env, ubuf, OUT_CREATE, fid,
364                                         buf_count, sizes, batchid);
365         if (IS_ERR(update))
366                 RETURN(PTR_ERR(update));
367
368         obdo = object_update_param_get(update, 0, NULL);
369         obdo->o_valid = 0;
370         obdo_from_la(obdo, attr, attr->la_valid);
371         lustre_set_wire_obdo(NULL, obdo, obdo);
372         if (fid1 != NULL) {
373                 struct lu_fid *fid;
374                 fid = object_update_param_get(update, 1, NULL);
375                 fid_cpu_to_le(fid, fid1);
376         }
377
378         RETURN(0);
379 }
380 EXPORT_SYMBOL(out_create_pack);
381
382 int out_ref_del_pack(const struct lu_env *env, struct update_buffer *ubuf,
383                      const struct lu_fid *fid, __u64 batchid)
384 {
385         return out_update_pack(env, ubuf, OUT_REF_DEL, fid, 0, NULL, NULL,
386                                batchid);
387 }
388 EXPORT_SYMBOL(out_ref_del_pack);
389
390 int out_ref_add_pack(const struct lu_env *env, struct update_buffer *ubuf,
391                      const struct lu_fid *fid, __u64 batchid)
392 {
393         return out_update_pack(env, ubuf, OUT_REF_ADD, fid, 0, NULL, NULL,
394                                batchid);
395 }
396 EXPORT_SYMBOL(out_ref_add_pack);
397
398 int out_attr_set_pack(const struct lu_env *env, struct update_buffer *ubuf,
399                       const struct lu_fid *fid, const struct lu_attr *attr,
400                       __u64 batchid)
401 {
402         struct object_update    *update;
403         struct obdo             *obdo;
404         __u16                   size = sizeof(*obdo);
405         ENTRY;
406
407         update = out_update_header_pack(env, ubuf, OUT_ATTR_SET, fid, 1,
408                                         &size, batchid);
409         if (IS_ERR(update))
410                 RETURN(PTR_ERR(update));
411
412         obdo = object_update_param_get(update, 0, NULL);
413         obdo->o_valid = 0;
414         obdo_from_la(obdo, attr, attr->la_valid);
415         lustre_set_wire_obdo(NULL, obdo, obdo);
416
417         RETURN(0);
418 }
419 EXPORT_SYMBOL(out_attr_set_pack);
420
421 int out_xattr_set_pack(const struct lu_env *env, struct update_buffer *ubuf,
422                        const struct lu_fid *fid, const struct lu_buf *buf,
423                        const char *name, int flag, __u64 batchid)
424 {
425         __u16   sizes[3] = {strlen(name) + 1, buf->lb_len, sizeof(flag)};
426         const void *bufs[3] = {(char *)name, (char *)buf->lb_buf,
427                                (char *)&flag};
428
429         return out_update_pack(env, ubuf, OUT_XATTR_SET, fid,
430                                ARRAY_SIZE(sizes), sizes, bufs, batchid);
431 }
432 EXPORT_SYMBOL(out_xattr_set_pack);
433
434 int out_xattr_del_pack(const struct lu_env *env, struct update_buffer *ubuf,
435                        const struct lu_fid *fid, const char *name,
436                        __u64 batchid)
437 {
438         __u16   size = strlen(name) + 1;
439
440         return out_update_pack(env, ubuf, OUT_XATTR_DEL, fid, 1, &size,
441                                (const void **)&name, batchid);
442 }
443 EXPORT_SYMBOL(out_xattr_del_pack);
444
445
446 int out_index_insert_pack(const struct lu_env *env, struct update_buffer *ubuf,
447                           const struct lu_fid *fid, const struct dt_rec *rec,
448                           const struct dt_key *key, __u64 batchid)
449 {
450         struct dt_insert_rec       *rec1 = (struct dt_insert_rec *)rec;
451         struct lu_fid              rec_fid;
452         __u32                       type = cpu_to_le32(rec1->rec_type);
453         __u16                       sizes[3] = { strlen((char *)key) + 1,
454                                                 sizeof(rec_fid),
455                                                 sizeof(type) };
456         const void                 *bufs[3] = { (char *)key,
457                                                 (char *)&rec_fid,
458                                                 (char *)&type };
459
460         fid_cpu_to_le(&rec_fid, rec1->rec_fid);
461
462         return out_update_pack(env, ubuf, OUT_INDEX_INSERT, fid,
463                                ARRAY_SIZE(sizes), sizes, bufs, batchid);
464 }
465 EXPORT_SYMBOL(out_index_insert_pack);
466
467 int out_index_delete_pack(const struct lu_env *env, struct update_buffer *ubuf,
468                           const struct lu_fid *fid, const struct dt_key *key,
469                           __u64 batchid)
470 {
471         __u16   size = strlen((char *)key) + 1;
472         const void *buf = key;
473
474         return out_update_pack(env, ubuf, OUT_INDEX_DELETE, fid, 1, &size,
475                                &buf, batchid);
476 }
477 EXPORT_SYMBOL(out_index_delete_pack);
478
479 int out_object_destroy_pack(const struct lu_env *env,
480                             struct update_buffer *ubuf,
481                             const struct lu_fid *fid, __u64 batchid)
482 {
483         return out_update_pack(env, ubuf, OUT_DESTROY, fid, 0, NULL, NULL,
484                                batchid);
485 }
486 EXPORT_SYMBOL(out_object_destroy_pack);
487
488 int out_write_pack(const struct lu_env *env, struct update_buffer *ubuf,
489                    const struct lu_fid *fid, const struct lu_buf *buf,
490                    loff_t pos, __u64 batchid)
491 {
492         __u16           sizes[2] = {buf->lb_len, sizeof(pos)};
493         const void      *bufs[2] = {(char *)buf->lb_buf, (char *)&pos};
494         int             rc;
495
496         pos = cpu_to_le64(pos);
497
498         rc = out_update_pack(env, ubuf, OUT_WRITE, fid, ARRAY_SIZE(sizes),
499                              sizes, bufs, batchid);
500         return rc;
501 }
502 EXPORT_SYMBOL(out_write_pack);
503
504 /**
505  * Pack various readonly updates into the update_buffer.
506  *
507  * The following update funcs are only used by read-only ops, lookup,
508  * getattr etc, so it does not need transaction here. Currently they
509  * are only used by OSP.
510  *
511  * \param[in] env       execution environment
512  * \param[in] fid       fid of this object for the update
513  * \param[in] ubuf      update buffer
514  *
515  * \retval              0 if packing succeeds.
516  * \retval              negative errno if packing fails.
517  */
518 int out_index_lookup_pack(const struct lu_env *env, struct update_buffer *ubuf,
519                           const struct lu_fid *fid, struct dt_rec *rec,
520                           const struct dt_key *key)
521 {
522         const void      *name = key;
523         __u16           size = strlen((char *)name) + 1;
524
525         return out_update_pack(env, ubuf, OUT_INDEX_LOOKUP, fid, 1, &size,
526                                &name, 0);
527 }
528 EXPORT_SYMBOL(out_index_lookup_pack);
529
530 int out_attr_get_pack(const struct lu_env *env, struct update_buffer *ubuf,
531                       const struct lu_fid *fid)
532 {
533         return out_update_pack(env, ubuf, OUT_ATTR_GET, fid, 0, NULL, NULL, 0);
534 }
535 EXPORT_SYMBOL(out_attr_get_pack);
536
537 int out_xattr_get_pack(const struct lu_env *env, struct update_buffer *ubuf,
538                        const struct lu_fid *fid, const char *name)
539 {
540         __u16 size;
541
542         LASSERT(name != NULL);
543         size = strlen(name) + 1;
544         return out_update_pack(env, ubuf, OUT_XATTR_GET, fid, 1, &size,
545                                (const void **)&name, 0);
546 }
547 EXPORT_SYMBOL(out_xattr_get_pack);