Whamcloud - gitweb
a351aee324eed0e644f60a95fdf76a69d18f5b19
[fs/lustre-release.git] / lustre / target / out_lib.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, Intel Corporation.
24  */
25 /*
26  * lustre/target/out_lib.c
27  *
28  * Author: Di Wang <di.wang@intel.com>
29  * Author: Fan, Yong <fan.yong@intel.com>
30  */
31
32 #define DEBUG_SUBSYSTEM S_CLASS
33
34 #include <lu_target.h>
35 #include <lustre_update.h>
36 #include <obd.h>
37 #include <obd_class.h>
38
39 #define OUT_UPDATE_BUFFER_SIZE_ADD      4096
40 #define OUT_UPDATE_BUFFER_SIZE_MAX      (256 * 4096)  /* 1MB update size now */
41
42 struct dt_update_request*
43 out_find_update(struct thandle_update *tu, struct dt_device *dt_dev)
44 {
45         struct dt_update_request   *dt_update;
46
47         list_for_each_entry(dt_update, &tu->tu_remote_update_list,
48                             dur_list) {
49                 if (dt_update->dur_dt == dt_dev)
50                         return dt_update;
51         }
52         return NULL;
53 }
54 EXPORT_SYMBOL(out_find_update);
55
56 static struct object_update_request *object_update_request_alloc(size_t size)
57 {
58         struct object_update_request *ourq;
59
60         OBD_ALLOC_LARGE(ourq, size);
61         if (ourq == NULL)
62                 RETURN(ERR_PTR(-ENOMEM));
63
64         ourq->ourq_magic = UPDATE_REQUEST_MAGIC;
65         ourq->ourq_count = 0;
66
67         RETURN(ourq);
68 }
69
70 static void object_update_request_free(struct object_update_request *ourq,
71                                        size_t ourq_size)
72 {
73         if (ourq != NULL)
74                 OBD_FREE_LARGE(ourq, ourq_size);
75 }
76
77 void dt_update_request_destroy(struct dt_update_request *dt_update)
78 {
79         if (dt_update == NULL)
80                 return;
81
82         list_del(&dt_update->dur_list);
83
84         object_update_request_free(dt_update->dur_buf.ub_req,
85                                    dt_update->dur_buf.ub_req_size);
86         OBD_FREE_PTR(dt_update);
87 }
88 EXPORT_SYMBOL(dt_update_request_destroy);
89
90 /**
91  * Allocate and initialize dt_update_request
92  *
93  * dt_update_request is being used to track updates being executed on
94  * this dt_device(OSD or OSP). The update buffer will be 8k initially,
95  * and increased if needed.
96  *
97  * \param [in] dt       dt device
98  *
99  * \retval              dt_update_request being allocated if succeed
100  * \retval              ERR_PTR(errno) if failed
101  */
102 struct dt_update_request *dt_update_request_create(struct dt_device *dt)
103 {
104         struct dt_update_request *dt_update;
105         struct object_update_request *ourq;
106
107         OBD_ALLOC_PTR(dt_update);
108         if (!dt_update)
109                 return ERR_PTR(-ENOMEM);
110
111         ourq = object_update_request_alloc(OUT_UPDATE_INIT_BUFFER_SIZE);
112         if (IS_ERR(ourq)) {
113                 OBD_FREE_PTR(dt_update);
114                 return ERR_CAST(ourq);
115         }
116
117         dt_update->dur_buf.ub_req = ourq;
118         dt_update->dur_buf.ub_req_size = OUT_UPDATE_INIT_BUFFER_SIZE;
119
120         INIT_LIST_HEAD(&dt_update->dur_list);
121         dt_update->dur_dt = dt;
122         dt_update->dur_batchid = 0;
123         INIT_LIST_HEAD(&dt_update->dur_cb_items);
124
125         return dt_update;
126 }
127 EXPORT_SYMBOL(dt_update_request_create);
128
129 /**
130  * Find or create dt_update_request.
131  *
132  * Find or create one loc in th_dev/dev_obj_update for the update,
133  * Because only one thread can access this thandle, no need
134  * lock now.
135  *
136  * \param[in] th        transaction handle
137  * \param[in] dt        lookup update request by dt_object
138  *
139  * \retval              pointer of dt_update_request if it can be created
140  *                      or found.
141  * \retval              ERR_PTR(errno) if it can not be created or found.
142  */
143 struct dt_update_request *
144 dt_update_request_find_or_create(struct thandle *th, struct dt_object *dt)
145 {
146         struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
147         struct thandle_update   *tu = th->th_update;
148         struct dt_update_request *update;
149         ENTRY;
150
151         if (tu == NULL) {
152                 OBD_ALLOC_PTR(tu);
153                 if (tu == NULL)
154                         RETURN(ERR_PTR(-ENOMEM));
155
156                 INIT_LIST_HEAD(&tu->tu_remote_update_list);
157                 tu->tu_sent_after_local_trans = 0;
158                 th->th_update = tu;
159         }
160
161         update = out_find_update(tu, dt_dev);
162         if (update != NULL)
163                 RETURN(update);
164
165         update = dt_update_request_create(dt_dev);
166         if (IS_ERR(update))
167                 RETURN(update);
168
169         list_add_tail(&update->dur_list, &tu->tu_remote_update_list);
170
171         if (!tu->tu_only_remote_trans)
172                 thandle_get(th);
173
174         RETURN(update);
175 }
176 EXPORT_SYMBOL(dt_update_request_find_or_create);
177
178 /**
179  * Prepare update request.
180  *
181  * Prepare OUT update ptlrpc request, and the request usually includes
182  * all of updates (stored in \param ureq) from one operation.
183  *
184  * \param[in] env       execution environment
185  * \param[in] imp       import on which ptlrpc request will be sent
186  * \param[in] ureq      hold all of updates which will be packed into the req
187  * \param[in] reqp      request to be created
188  *
189  * \retval              0 if preparation succeeds.
190  * \retval              negative errno if preparation fails.
191  */
192 int out_prep_update_req(const struct lu_env *env, struct obd_import *imp,
193                         const struct object_update_request *ureq,
194                         struct ptlrpc_request **reqp)
195 {
196         struct ptlrpc_request           *req;
197         struct object_update_request    *tmp;
198         int                             ureq_len;
199         int                             rc;
200         ENTRY;
201
202         req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
203         if (req == NULL)
204                 RETURN(-ENOMEM);
205
206         ureq_len = object_update_request_size(ureq);
207         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE, RCL_CLIENT,
208                              ureq_len);
209
210         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
211         if (rc != 0) {
212                 ptlrpc_req_finished(req);
213                 RETURN(rc);
214         }
215
216         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
217                              RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
218
219         tmp = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE);
220         memcpy(tmp, ureq, ureq_len);
221
222         ptlrpc_request_set_replen(req);
223         req->rq_request_portal = OUT_PORTAL;
224         req->rq_reply_portal = OSC_REPLY_PORTAL;
225         *reqp = req;
226
227         RETURN(rc);
228 }
229 EXPORT_SYMBOL(out_prep_update_req);
230
231 /**
232  * Send update RPC.
233  *
234  * Send update request to the remote MDT synchronously.
235  *
236  * \param[in] env       execution environment
237  * \param[in] imp       import on which ptlrpc request will be sent
238  * \param[in] dt_update hold all of updates which will be packed into the req
239  * \param[in] reqp      request to be created
240  *
241  * \retval              0 if RPC succeeds.
242  * \retval              negative errno if RPC fails.
243  */
244 int out_remote_sync(const struct lu_env *env, struct obd_import *imp,
245                     struct dt_update_request *dt_update,
246                     struct ptlrpc_request **reqp)
247 {
248         struct ptlrpc_request   *req = NULL;
249         int                     rc;
250         ENTRY;
251
252         rc = out_prep_update_req(env, imp, dt_update->dur_buf.ub_req, &req);
253         if (rc != 0)
254                 RETURN(rc);
255
256         /* Note: some dt index api might return non-zero result here, like
257          * osd_index_ea_lookup, so we should only check rc < 0 here */
258         rc = ptlrpc_queue_wait(req);
259         if (rc < 0) {
260                 ptlrpc_req_finished(req);
261                 dt_update->dur_rc = rc;
262                 RETURN(rc);
263         }
264
265         if (reqp != NULL) {
266                 *reqp = req;
267                 RETURN(rc);
268         }
269
270         dt_update->dur_rc = rc;
271
272         ptlrpc_req_finished(req);
273
274         RETURN(rc);
275 }
276 EXPORT_SYMBOL(out_remote_sync);
277
278 /**
279  * resize update buffer
280  *
281  * Extend the update buffer by new_size.
282  *
283  * \param[in] ubuf      update buffer to be extended
284  * \param[in] new_size  new size of the update buffer
285  *
286  * \retval              0 if extending succeeds.
287  * \retval              negative errno if extending fails.
288  */
289 static int update_buffer_resize(struct update_buffer *ubuf, size_t new_size)
290 {
291         struct object_update_request *ureq;
292
293         if (new_size > ubuf->ub_req_size)
294                 return 0;
295
296         OBD_ALLOC_LARGE(ureq, new_size);
297         if (ureq == NULL)
298                 return -ENOMEM;
299
300         memcpy(ureq, ubuf->ub_req, ubuf->ub_req_size);
301
302         OBD_FREE_LARGE(ubuf->ub_req, ubuf->ub_req_size);
303
304         ubuf->ub_req = ureq;
305         ubuf->ub_req_size = new_size;
306
307         return 0;
308 }
309
310 /**
311  * Pack the header of object_update_request
312  *
313  * Packs updates into the update_buffer header, which will either be sent to
314  * the remote MDT or stored in the local update log. The maximum update buffer
315  * size is 1MB for now.
316  *
317  * \param[in] env       execution environment
318  * \param[in] ubuf      update bufer which it will pack the update in
319  * \param[in] op        update operation
320  * \param[in] fid       object FID for this update
321  * \param[in] param_count       parameters count for this update
322  * \param[in] lens      each parameters length of this update
323  * \param[in] batchid   batchid(transaction no) of this update
324  *
325  * \retval              0 pack update succeed.
326  *                      negative errno pack update failed.
327  **/
328 static struct object_update*
329 out_update_header_pack(const struct lu_env *env, struct update_buffer *ubuf,
330                        enum update_type op, const struct lu_fid *fid,
331                        int params_count, __u16 *param_sizes, __u64 batchid)
332 {
333         struct object_update_request    *ureq = ubuf->ub_req;
334         size_t                          ureq_size = ubuf->ub_req_size;
335         struct object_update            *obj_update;
336         struct object_update_param      *param;
337         size_t                          update_size;
338         int                             rc = 0;
339         unsigned int                    i;
340         ENTRY;
341
342         /* Check update size to make sure it can fit into the buffer */
343         ureq_size = object_update_request_size(ureq);
344         update_size = offsetof(struct object_update, ou_params[0]);
345         for (i = 0; i < params_count; i++)
346                 update_size += cfs_size_round(param_sizes[i] + sizeof(*param));
347
348         if (unlikely(cfs_size_round(ureq_size + update_size) >
349                      ubuf->ub_req_size)) {
350                 size_t new_size = ubuf->ub_req_size;
351
352                 /* enlarge object update request size */
353                 while (new_size <
354                        cfs_size_round(ureq_size + update_size))
355                         new_size += OUT_UPDATE_BUFFER_SIZE_ADD;
356                 if (new_size >= OUT_UPDATE_BUFFER_SIZE_MAX)
357                         RETURN(ERR_PTR(-E2BIG));
358
359                 rc = update_buffer_resize(ubuf, new_size);
360                 if (rc < 0)
361                         RETURN(ERR_PTR(rc));
362
363                 ureq = ubuf->ub_req;
364         }
365
366         /* fill the update into the update buffer */
367         obj_update = (struct object_update *)((char *)ureq + ureq_size);
368         obj_update->ou_fid = *fid;
369         obj_update->ou_type = op;
370         obj_update->ou_params_count = (__u16)params_count;
371         obj_update->ou_batchid = batchid;
372         param = &obj_update->ou_params[0];
373         for (i = 0; i < params_count; i++) {
374                 param->oup_len = param_sizes[i];
375                 param = (struct object_update_param *)((char *)param +
376                          object_update_param_size(param));
377         }
378         ureq->ourq_count++;
379
380         CDEBUG(D_INFO, "%p "DFID" idx %u: op %d params %d:%d\n",
381                ureq, PFID(fid), ureq->ourq_count, op, params_count,
382                (int)update_size);
383
384         RETURN(obj_update);
385 }
386
387 /**
388  * Packs one update into the update_buffer.
389  *
390  * \param[in] env       execution environment
391  * \param[in] ubuf      bufer where update will be packed
392  * \param[in] op        update operation (enum update_type)
393  * \param[in] fid       object FID for this update
394  * \param[in] param_count       number of parameters for this update
395  * \param[in] param_sizes       array of parameters length of this update
396  * \param[in] param_bufs        parameter buffers
397  * \param[in] batchid   transaction no of this update, plus mdt_index, which
398  *                      will be globally unique
399  *
400  * \retval              = 0 if updates packing succeeds
401  * \retval              negative errno if updates packing fails
402  **/
403 int out_update_pack(const struct lu_env *env, struct update_buffer *ubuf,
404                     enum update_type op, const struct lu_fid *fid,
405                     int params_count, __u16 *param_sizes,
406                     const void **param_bufs, __u64 batchid)
407 {
408         struct object_update            *update;
409         struct object_update_param      *param;
410         unsigned int                    i;
411         ENTRY;
412
413         update = out_update_header_pack(env, ubuf, op, fid, params_count,
414                                         param_sizes, batchid);
415         if (IS_ERR(update))
416                 RETURN(PTR_ERR(update));
417
418         param = &update->ou_params[0];
419         for (i = 0; i < params_count; i++) {
420                 memcpy(&param->oup_buf[0], param_bufs[i], param_sizes[i]);
421                 param = (struct object_update_param *)((char *)param +
422                          object_update_param_size(param));
423         }
424
425         RETURN(0);
426 }
427 EXPORT_SYMBOL(out_update_pack);
428
429 /**
430  * Pack various updates into the update_buffer.
431  *
432  * The following functions pack different updates into the update_buffer
433  * So parameters of these API is basically same as its correspondent OSD/OSP
434  * API, for detail description of these parameters see osd_handler.c or
435  * osp_md_object.c.
436  *
437  * \param[in] env       execution environment
438  * \param[in] ubuf      update buffer
439  * \param[in] fid       fid of this object for the update
440  * \param[in] batchid   batch id of this update
441  *
442  * \retval              0 if insertion succeeds.
443  * \retval              negative errno if insertion fails.
444  */
445 int out_create_pack(const struct lu_env *env, struct update_buffer *ubuf,
446                     const struct lu_fid *fid, struct lu_attr *attr,
447                     struct dt_allocation_hint *hint,
448                     struct dt_object_format *dof, __u64 batchid)
449 {
450         struct obdo             *obdo;
451         __u16                   sizes[2] = {sizeof(*obdo), 0};
452         int                     buf_count = 1;
453         const struct lu_fid     *fid1 = NULL;
454         struct object_update    *update;
455         ENTRY;
456
457         if (hint != NULL && hint->dah_parent) {
458                 fid1 = lu_object_fid(&hint->dah_parent->do_lu);
459                 sizes[1] = sizeof(*fid1);
460                 buf_count++;
461         }
462
463         update = out_update_header_pack(env, ubuf, OUT_CREATE, fid,
464                                         buf_count, sizes, batchid);
465         if (IS_ERR(update))
466                 RETURN(PTR_ERR(update));
467
468         obdo = object_update_param_get(update, 0, NULL);
469         obdo->o_valid = 0;
470         obdo_from_la(obdo, attr, attr->la_valid);
471         lustre_set_wire_obdo(NULL, obdo, obdo);
472         if (fid1 != NULL) {
473                 struct lu_fid *fid;
474                 fid = object_update_param_get(update, 1, NULL);
475                 fid_cpu_to_le(fid, fid1);
476         }
477
478         RETURN(0);
479 }
480 EXPORT_SYMBOL(out_create_pack);
481
482 int out_ref_del_pack(const struct lu_env *env, struct update_buffer *ubuf,
483                      const struct lu_fid *fid, __u64 batchid)
484 {
485         return out_update_pack(env, ubuf, OUT_REF_DEL, fid, 0, NULL, NULL,
486                                batchid);
487 }
488 EXPORT_SYMBOL(out_ref_del_pack);
489
490 int out_ref_add_pack(const struct lu_env *env, struct update_buffer *ubuf,
491                      const struct lu_fid *fid, __u64 batchid)
492 {
493         return out_update_pack(env, ubuf, OUT_REF_ADD, fid, 0, NULL, NULL,
494                                batchid);
495 }
496 EXPORT_SYMBOL(out_ref_add_pack);
497
498 int out_attr_set_pack(const struct lu_env *env, struct update_buffer *ubuf,
499                       const struct lu_fid *fid, const struct lu_attr *attr,
500                       __u64 batchid)
501 {
502         struct object_update    *update;
503         struct obdo             *obdo;
504         __u16                   size = sizeof(*obdo);
505         ENTRY;
506
507         update = out_update_header_pack(env, ubuf, OUT_ATTR_SET, fid, 1,
508                                         &size, batchid);
509         if (IS_ERR(update))
510                 RETURN(PTR_ERR(update));
511
512         obdo = object_update_param_get(update, 0, NULL);
513         obdo->o_valid = 0;
514         obdo_from_la(obdo, attr, attr->la_valid);
515         lustre_set_wire_obdo(NULL, obdo, obdo);
516
517         RETURN(0);
518 }
519 EXPORT_SYMBOL(out_attr_set_pack);
520
521 int out_xattr_set_pack(const struct lu_env *env, struct update_buffer *ubuf,
522                        const struct lu_fid *fid, const struct lu_buf *buf,
523                        const char *name, int flag, __u64 batchid)
524 {
525         __u16   sizes[3] = {strlen(name) + 1, buf->lb_len, sizeof(flag)};
526         const void *bufs[3] = {(char *)name, (char *)buf->lb_buf,
527                                (char *)&flag};
528
529         return out_update_pack(env, ubuf, OUT_XATTR_SET, fid,
530                                ARRAY_SIZE(sizes), sizes, bufs, batchid);
531 }
532 EXPORT_SYMBOL(out_xattr_set_pack);
533
534 int out_xattr_del_pack(const struct lu_env *env, struct update_buffer *ubuf,
535                        const struct lu_fid *fid, const char *name,
536                        __u64 batchid)
537 {
538         __u16   size = strlen(name) + 1;
539
540         return out_update_pack(env, ubuf, OUT_XATTR_DEL, fid, 1, &size,
541                                (const void **)&name, batchid);
542 }
543 EXPORT_SYMBOL(out_xattr_del_pack);
544
545
546 int out_index_insert_pack(const struct lu_env *env, struct update_buffer *ubuf,
547                           const struct lu_fid *fid, const struct dt_rec *rec,
548                           const struct dt_key *key, __u64 batchid)
549 {
550         struct dt_insert_rec       *rec1 = (struct dt_insert_rec *)rec;
551         struct lu_fid              rec_fid;
552         __u32                       type = cpu_to_le32(rec1->rec_type);
553         __u16                       sizes[3] = { strlen((char *)key) + 1,
554                                                 sizeof(rec_fid),
555                                                 sizeof(type) };
556         const void                 *bufs[3] = { (char *)key,
557                                                 (char *)&rec_fid,
558                                                 (char *)&type };
559
560         fid_cpu_to_le(&rec_fid, rec1->rec_fid);
561
562         return out_update_pack(env, ubuf, OUT_INDEX_INSERT, fid,
563                                ARRAY_SIZE(sizes), sizes, bufs, batchid);
564 }
565 EXPORT_SYMBOL(out_index_insert_pack);
566
567 int out_index_delete_pack(const struct lu_env *env, struct update_buffer *ubuf,
568                           const struct lu_fid *fid, const struct dt_key *key,
569                           __u64 batchid)
570 {
571         __u16   size = strlen((char *)key) + 1;
572         const void *buf = key;
573
574         return out_update_pack(env, ubuf, OUT_INDEX_DELETE, fid, 1, &size,
575                                &buf, batchid);
576 }
577 EXPORT_SYMBOL(out_index_delete_pack);
578
579 int out_object_destroy_pack(const struct lu_env *env,
580                             struct update_buffer *ubuf,
581                             const struct lu_fid *fid, __u64 batchid)
582 {
583         return out_update_pack(env, ubuf, OUT_DESTROY, fid, 0, NULL, NULL,
584                                batchid);
585 }
586 EXPORT_SYMBOL(out_object_destroy_pack);
587
588 int out_write_pack(const struct lu_env *env, struct update_buffer *ubuf,
589                    const struct lu_fid *fid, const struct lu_buf *buf,
590                    loff_t pos, __u64 batchid)
591 {
592         __u16           sizes[2] = {buf->lb_len, sizeof(pos)};
593         const void      *bufs[2] = {(char *)buf->lb_buf, (char *)&pos};
594         int             rc;
595
596         pos = cpu_to_le64(pos);
597
598         rc = out_update_pack(env, ubuf, OUT_WRITE, fid, ARRAY_SIZE(sizes),
599                              sizes, bufs, batchid);
600         return rc;
601 }
602 EXPORT_SYMBOL(out_write_pack);
603
604 /**
605  * Pack various readonly updates into the update_buffer.
606  *
607  * The following update funcs are only used by read-only ops, lookup,
608  * getattr etc, so it does not need transaction here. Currently they
609  * are only used by OSP.
610  *
611  * \param[in] env       execution environment
612  * \param[in] fid       fid of this object for the update
613  * \param[in] ubuf      update buffer
614  *
615  * \retval              0 if packing succeeds.
616  * \retval              negative errno if packing fails.
617  */
618 int out_index_lookup_pack(const struct lu_env *env, struct update_buffer *ubuf,
619                           const struct lu_fid *fid, struct dt_rec *rec,
620                           const struct dt_key *key)
621 {
622         const void      *name = key;
623         __u16           size = strlen((char *)name) + 1;
624
625         return out_update_pack(env, ubuf, OUT_INDEX_LOOKUP, fid, 1, &size,
626                                &name, 0);
627 }
628 EXPORT_SYMBOL(out_index_lookup_pack);
629
630 int out_attr_get_pack(const struct lu_env *env, struct update_buffer *ubuf,
631                       const struct lu_fid *fid)
632 {
633         return out_update_pack(env, ubuf, OUT_ATTR_GET, fid, 0, NULL, NULL, 0);
634 }
635 EXPORT_SYMBOL(out_attr_get_pack);
636
637 int out_xattr_get_pack(const struct lu_env *env, struct update_buffer *ubuf,
638                        const struct lu_fid *fid, const char *name)
639 {
640         __u16 size;
641
642         LASSERT(name != NULL);
643         size = strlen(name) + 1;
644         return out_update_pack(env, ubuf, OUT_XATTR_GET, fid, 1, &size,
645                                (const void **)&name, 0);
646 }
647 EXPORT_SYMBOL(out_xattr_get_pack);