Whamcloud - gitweb
LU-3534 osp: transfer updates with bulk RPC
[fs/lustre-release.git] / lustre / include / lustre_update.h
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.htm
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2014, Intel Corporation.
24  */
25 /*
26  * lustre/include/lustre_update.h
27  *
28  * Author: Di Wang <di.wang@intel.com>
29  */
30
31 #ifndef _LUSTRE_UPDATE_H
32 #define _LUSTRE_UPDATE_H
33 #include <lustre_net.h>
34 #include <dt_object.h>
35
36 #define OUT_UPDATE_INIT_BUFFER_SIZE     4096
37 /* 16KB, the current biggest size is llog header(8KB) */
38 #define OUT_UPDATE_REPLY_SIZE           16384
39
40 struct dt_key;
41 struct dt_rec;
42 struct object_update_param;
43
44 struct update_params {
45         struct object_update_param      up_params[0];
46 };
47
48 static inline size_t update_params_size(const struct update_params *params,
49                                         unsigned int param_count)
50 {
51         struct object_update_param      *param;
52         size_t total_size = sizeof(*params);
53         unsigned int i;
54
55         param = (struct object_update_param *)&params->up_params[0];
56         for (i = 0; i < param_count; i++) {
57                 size_t size = object_update_param_size(param);
58
59                 param = (struct object_update_param *)((char *)param + size);
60                 total_size += size;
61         }
62
63         return total_size;
64 }
65
66 static inline struct object_update_param *
67 update_params_get_param(const struct update_params *params,
68                         unsigned int index, unsigned int param_count)
69 {
70         struct object_update_param *param;
71         unsigned int            i;
72
73         if (index > param_count)
74                 return NULL;
75
76         param = (struct object_update_param *)&params->up_params[0];
77         for (i = 0; i < index; i++)
78                 param = (struct object_update_param *)((char *)param +
79                         object_update_param_size(param));
80
81         return param;
82 }
83
84 static inline void*
85 update_params_get_param_buf(const struct update_params *params, __u16 index,
86                             unsigned int param_count, __u16 *size)
87 {
88         struct object_update_param *param;
89
90         param = update_params_get_param(params, (unsigned int)index,
91                                         param_count);
92         if (param == NULL)
93                 return NULL;
94
95         if (size != NULL)
96                 *size = param->oup_len;
97
98         return param->oup_buf;
99 }
100
101 struct update_op {
102         struct lu_fid uop_fid;
103         __u16   uop_type;
104         __u16   uop_param_count;
105         __u16   uop_params_off[0];
106 };
107
108 static inline size_t
109 update_op_size(unsigned int param_count)
110 {
111         return offsetof(struct update_op, uop_params_off[param_count]);
112 }
113
114 static inline struct update_op *
115 update_op_next_op(const struct update_op *uop)
116 {
117         return (struct update_op *)((char *)uop +
118                                 update_op_size(uop->uop_param_count));
119 }
120
121 /* All of updates in the mulitple_update_record */
122 struct update_ops {
123         struct update_op        uops_op[0];
124 };
125
126 static inline size_t update_ops_size(const struct update_ops *ops,
127                                      unsigned int update_count)
128 {
129         struct update_op *op;
130         size_t total_size = sizeof(*ops);
131         unsigned int i;
132
133         op = (struct update_op *)&ops->uops_op[0];
134         for (i = 0; i < update_count; i++, op = update_op_next_op(op))
135                 total_size += update_op_size(op->uop_param_count);
136
137         return total_size;
138 }
139
140 /*
141  * This is the update record format used to store the updates in
142  * disk. All updates of the operation will be stored in ur_ops.
143  * All of parameters for updates of the operation will be stored
144  * in ur_params.
145  * To save the space of the record, parameters in ur_ops will only
146  * remember their offset in ur_params, so to avoid storing duplicate
147  * parameters in ur_params, which can help us save a lot space for
148  * operation like creating striped directory.
149  */
150 struct update_records {
151         __u64                   ur_master_transno;
152         __u64                   ur_batchid;
153         __u32                   ur_flags;
154         __u32                   ur_param_count;
155         __u32                   ur_update_count;
156         struct update_ops       ur_ops;
157          /* Note ur_ops has a variable size, so comment out
158           * the following ur_params, in case some use it directly
159           * update_records->ur_params
160           *
161           * struct update_params        ur_params;
162           */
163 };
164
165 struct llog_update_record {
166         struct llog_rec_hdr     lur_hdr;
167         struct update_records   lur_update_rec;
168         /* Note ur_update_rec has a variable size, so comment out
169          * the following ur_tail, in case someone use it directly
170          *
171          * struct llog_rec_tail lur_tail;
172          */
173 };
174
175 static inline struct update_params *
176 update_records_get_params(const struct update_records *record)
177 {
178         return (struct update_params *)((char *)record +
179                 offsetof(struct update_records, ur_ops) +
180                 update_ops_size(&record->ur_ops, record->ur_update_count));
181 }
182
183 static inline size_t
184 update_records_size(const struct update_records *record)
185 {
186         struct update_params *params;
187
188         params = update_records_get_params(record);
189
190         return cfs_size_round(offsetof(struct update_records, ur_ops) +
191                update_ops_size(&record->ur_ops, record->ur_update_count) +
192                update_params_size(params, record->ur_param_count));
193 }
194
195 static inline size_t
196 llog_update_record_size(const struct llog_update_record *lur)
197 {
198         return cfs_size_round(sizeof(lur->lur_hdr) +
199                               update_records_size(&lur->lur_update_rec) +
200                               sizeof(struct llog_rec_tail));
201 }
202
203 static inline struct update_op *
204 update_ops_get_op(const struct update_ops *ops, unsigned int index,
205                   unsigned int update_count)
206 {
207         struct update_op *op;
208         unsigned int i;
209
210         if (index > update_count)
211                 return NULL;
212
213         op = (struct update_op *)&ops->uops_op[0];
214         for (i = 0; i < index; i++)
215                 op = update_op_next_op(op);
216
217         return op;
218 }
219
220 static inline void
221 *object_update_param_get(const struct object_update *update, size_t index,
222                          size_t *size)
223 {
224         const struct    object_update_param *param;
225         size_t          i;
226
227         if (index >= update->ou_params_count)
228                 return ERR_PTR(-EINVAL);
229
230         param = &update->ou_params[0];
231         for (i = 0; i < index; i++)
232                 param = (struct object_update_param *)((char *)param +
233                         object_update_param_size(param));
234
235         if (size != NULL)
236                 *size = param->oup_len;
237
238         if (param->oup_len == 0)
239                 return NULL;
240
241         return (void *)&param->oup_buf[0];
242 }
243
244 static inline unsigned long
245 object_update_request_size(const struct object_update_request *our)
246 {
247         unsigned long   size;
248         size_t          i = 0;
249
250         size = offsetof(struct object_update_request, ourq_updates[0]);
251         for (i = 0; i < our->ourq_count; i++) {
252                 struct object_update *update;
253
254                 update = (struct object_update *)((char *)our + size);
255                 size += object_update_size(update);
256         }
257         return size;
258 }
259
260 static inline void
261 object_update_result_insert(struct object_update_reply *reply,
262                             void *data, size_t data_len, size_t index,
263                             int rc)
264 {
265         struct object_update_result *update_result;
266         char *ptr;
267
268         update_result = object_update_result_get(reply, index, NULL);
269         LASSERT(update_result != NULL);
270
271         update_result->our_rc = ptlrpc_status_hton(rc);
272         if (data_len > 0) {
273                 LASSERT(data != NULL);
274                 ptr = (char *)update_result +
275                         cfs_size_round(sizeof(struct object_update_reply));
276                 update_result->our_datalen = data_len;
277                 memcpy(ptr, data, data_len);
278         }
279
280         reply->ourp_lens[index] = cfs_size_round(data_len +
281                                         sizeof(struct object_update_result));
282 }
283
284 static inline int
285 object_update_result_data_get(const struct object_update_reply *reply,
286                               struct lu_buf *lbuf, size_t index)
287 {
288         struct object_update_result *update_result;
289         size_t size = 0;
290         int    result;
291
292         LASSERT(lbuf != NULL);
293         update_result = object_update_result_get(reply, index, &size);
294         if (update_result == NULL ||
295             size < cfs_size_round(sizeof(struct object_update_reply)) ||
296             update_result->our_datalen > size)
297                 RETURN(-EFAULT);
298
299         result = ptlrpc_status_ntoh(update_result->our_rc);
300         if (result < 0)
301                 return result;
302
303         lbuf->lb_buf = update_result->our_data;
304         lbuf->lb_len = update_result->our_datalen;
305
306         return 0;
307 }
308
309 /**
310  * Attached in the thandle to record the updates for distribute
311  * distribution.
312  */
313 struct thandle_update_records {
314         /* All of updates for the cross-MDT operation. */
315         struct llog_update_record       *tur_update_records;
316         size_t                          tur_update_records_buf_size;
317
318         /* All of parameters for the cross-MDT operation */
319         struct update_params    *tur_update_params;
320         unsigned int            tur_update_param_count;
321         size_t                  tur_update_params_buf_size;
322 };
323
324 #define TOP_THANDLE_MAGIC       0x20140917
325 struct top_multiple_thandle {
326         struct dt_device        *tmt_master_sub_dt;
327         atomic_t                tmt_refcount;
328         /* Other sub transactions will be listed here. */
329         struct list_head        tmt_sub_thandle_list;
330
331         struct list_head        tmt_commit_list;
332         /* All of update records will packed here */
333         struct thandle_update_records *tmt_update_records;
334
335         wait_queue_head_t       tmt_stop_waitq;
336         __u64                   tmt_batchid;
337         int                     tmt_result;
338         __u32                   tmt_magic;
339         __u32                   tmt_committed:1;
340 };
341
342 /* {top,sub}_thandle are used to manage distributed transactions which
343  * include updates on several nodes. A top_handle represents the
344  * whole operation, and sub_thandle represents updates on each node. */
345 struct top_thandle {
346         struct thandle          tt_super;
347         /* The master sub transaction. */
348         struct thandle          *tt_master_sub_thandle;
349
350         struct top_multiple_thandle *tt_multiple_thandle;
351 };
352
353 /* Sub thandle is used to track multiple sub thandles under one parent
354  * thandle */
355 struct sub_thandle {
356         struct thandle          *st_sub_th;
357         struct dt_device        *st_dt;
358         struct llog_cookie      st_cookie;
359         struct dt_txn_commit_cb st_commit_dcb;
360         struct dt_txn_commit_cb st_stop_dcb;
361         int                     st_result;
362
363         /* linked to top_thandle */
364         struct list_head        st_sub_list;
365
366         /* If this sub thandle is committed */
367         bool                    st_committed:1,
368                                 st_stopped:1;
369 };
370
371 struct tx_arg;
372 typedef int (*tx_exec_func_t)(const struct lu_env *env, struct thandle *th,
373                               struct tx_arg *ta);
374
375 /* Structure for holding one update execution */
376 struct tx_arg {
377         tx_exec_func_t           exec_fn;
378         tx_exec_func_t           undo_fn;
379         struct dt_object        *object;
380         const char              *file;
381         struct object_update_reply *reply;
382         int                      line;
383         int                      index;
384         union {
385                 struct {
386                         struct dt_insert_rec     rec;
387                         const struct dt_key     *key;
388                 } insert;
389                 struct {
390                 } ref;
391                 struct {
392                         struct lu_attr   attr;
393                 } attr_set;
394                 struct {
395                         struct lu_buf    buf;
396                         const char      *name;
397                         int              flags;
398                         __u32            csum;
399                 } xattr_set;
400                 struct {
401                         struct lu_attr                  attr;
402                         struct dt_allocation_hint       hint;
403                         struct dt_object_format         dof;
404                         struct lu_fid                   fid;
405                 } create;
406                 struct {
407                         struct lu_buf   buf;
408                         loff_t          pos;
409                 } write;
410                 struct {
411                         struct ost_body     *body;
412                 } destroy;
413         } u;
414 };
415
416 /* Structure for holding all update executations of one transaction */
417 struct thandle_exec_args {
418         struct thandle          *ta_handle;
419         int                     ta_argno;   /* used args */
420         int                     ta_alloc_args; /* allocated args count */
421         struct tx_arg           **ta_args;
422 };
423
424 /* target/out_lib.c */
425 int out_update_pack(const struct lu_env *env, struct object_update *update,
426                     size_t *max_update_size, enum update_type op,
427                     const struct lu_fid *fid, unsigned int params_count,
428                     __u16 *param_sizes, const void **param_bufs);
429 int out_create_pack(const struct lu_env *env, struct object_update *update,
430                     size_t *max_update_size, const struct lu_fid *fid,
431                     const struct lu_attr *attr, struct dt_allocation_hint *hint,
432                     struct dt_object_format *dof);
433 int out_object_destroy_pack(const struct lu_env *env,
434                             struct object_update *update,
435                             size_t *max_update_size,
436                             const struct lu_fid *fid);
437 int out_index_delete_pack(const struct lu_env *env,
438                           struct object_update *update, size_t *max_update_size,
439                           const struct lu_fid *fid, const struct dt_key *key);
440 int out_index_insert_pack(const struct lu_env *env,
441                           struct object_update *update, size_t *max_update_size,
442                           const struct lu_fid *fid, const struct dt_rec *rec,
443                           const struct dt_key *key);
444 int out_xattr_set_pack(const struct lu_env *env,
445                        struct object_update *update, size_t *max_update_size,
446                        const struct lu_fid *fid, const struct lu_buf *buf,
447                        const char *name, __u32 flag);
448 int out_xattr_del_pack(const struct lu_env *env,
449                        struct object_update *update, size_t *max_update_size,
450                        const struct lu_fid *fid, const char *name);
451 int out_attr_set_pack(const struct lu_env *env,
452                       struct object_update *update, size_t *max_update_size,
453                       const struct lu_fid *fid, const struct lu_attr *attr);
454 int out_ref_add_pack(const struct lu_env *env,
455                      struct object_update *update, size_t *max_update_size,
456                      const struct lu_fid *fid);
457 int out_ref_del_pack(const struct lu_env *env,
458                      struct object_update *update, size_t *max_update_size,
459                      const struct lu_fid *fid);
460 int out_write_pack(const struct lu_env *env,
461                    struct object_update *update, size_t *max_update_size,
462                    const struct lu_fid *fid, const struct lu_buf *buf,
463                    __u64 pos);
464 int out_attr_get_pack(const struct lu_env *env,
465                       struct object_update *update, size_t *max_update_size,
466                       const struct lu_fid *fid);
467 int out_index_lookup_pack(const struct lu_env *env,
468                           struct object_update *update, size_t *max_update_size,
469                           const struct lu_fid *fid, struct dt_rec *rec,
470                           const struct dt_key *key);
471 int out_xattr_get_pack(const struct lu_env *env,
472                        struct object_update *update, size_t *max_update_size,
473                        const struct lu_fid *fid, const char *name);
474 int out_read_pack(const struct lu_env *env, struct object_update *update,
475                   size_t *max_update_length, const struct lu_fid *fid,
476                   size_t size, loff_t pos);
477
478 const char *update_op_str(__u16 opcode);
479
480 /* target/update_trans.c */
481 struct thandle *thandle_get_sub_by_dt(const struct lu_env *env,
482                                       struct thandle *th,
483                                       struct dt_device *sub_dt);
484
485 static inline struct thandle *
486 thandle_get_sub(const struct lu_env *env, struct thandle *th,
487                  const struct dt_object *sub_obj)
488 {
489         return thandle_get_sub_by_dt(env, th, lu2dt_dev(sub_obj->do_lu.lo_dev));
490 }
491
492 struct thandle *
493 top_trans_create(const struct lu_env *env, struct dt_device *master_dev);
494 int top_trans_start(const struct lu_env *env, struct dt_device *master_dev,
495                     struct thandle *th);
496 int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev,
497                    struct thandle *th);
498 void top_multiple_thandle_destroy(struct top_multiple_thandle *tmt);
499
500 static inline void top_multiple_thandle_get(struct top_multiple_thandle *tmt)
501 {
502         atomic_inc(&tmt->tmt_refcount);
503 }
504
505 static inline void top_multiple_thandle_put(struct top_multiple_thandle *tmt)
506 {
507         if (atomic_dec_and_test(&tmt->tmt_refcount))
508                 top_multiple_thandle_destroy(tmt);
509 }
510
511 struct sub_thandle *lookup_sub_thandle(struct top_multiple_thandle *tmt,
512                                        struct dt_device *dt_dev);
513 int sub_thandle_trans_create(const struct lu_env *env,
514                              struct top_thandle *top_th,
515                              struct sub_thandle *st);
516
517 /* update_records.c */
518 int update_records_create_pack(const struct lu_env *env,
519                                struct update_ops *ops,
520                                unsigned int *op_count,
521                                size_t *max_ops_size,
522                                struct update_params *params,
523                                unsigned int *param_count,
524                                size_t *max_param_size,
525                                const struct lu_fid *fid,
526                                const struct lu_attr *attr,
527                                const struct dt_allocation_hint *hint,
528                                struct dt_object_format *dof);
529 int update_records_attr_set_pack(const struct lu_env *env,
530                                  struct update_ops *ops,
531                                  unsigned int *op_count,
532                                  size_t *max_ops_size,
533                                  struct update_params *params,
534                                  unsigned int *param_count,
535                                  size_t *max_param_size,
536                                  const struct lu_fid *fid,
537                                  const struct lu_attr *attr);
538 int update_records_ref_add_pack(const struct lu_env *env,
539                                 struct update_ops *ops,
540                                 unsigned int *op_count,
541                                 size_t *max_ops_size,
542                                 struct update_params *params,
543                                 unsigned int *param_count,
544                                 size_t *max_param_size,
545                                 const struct lu_fid *fid);
546 int update_records_ref_del_pack(const struct lu_env *env,
547                                 struct update_ops *ops,
548                                 unsigned int *op_count,
549                                 size_t *max_ops_size,
550                                 struct update_params *params,
551                                 unsigned int *param_count,
552                                 size_t *max_param_size,
553                                 const struct lu_fid *fid);
554 int update_records_object_destroy_pack(const struct lu_env *env,
555                                        struct update_ops *ops,
556                                        unsigned int *op_count,
557                                        size_t *max_ops_size,
558                                        struct update_params *params,
559                                        unsigned int *param_count,
560                                        size_t *max_param_size,
561                                        const struct lu_fid *fid);
562 int update_records_index_insert_pack(const struct lu_env *env,
563                                      struct update_ops *ops,
564                                      unsigned int *op_count,
565                                      size_t *max_ops_size,
566                                      struct update_params *params,
567                                      unsigned int *param_count,
568                                      size_t *max_param_size,
569                                      const struct lu_fid *fid,
570                                      const struct dt_rec *rec,
571                                      const struct dt_key *key);
572 int update_records_index_delete_pack(const struct lu_env *env,
573                                      struct update_ops *ops,
574                                      unsigned int *op_count,
575                                      size_t *max_ops_size,
576                                      struct update_params *params,
577                                      unsigned int *param_count,
578                                      size_t *max_param_size,
579                                      const struct lu_fid *fid,
580                                      const struct dt_key *key);
581 int update_records_xattr_set_pack(const struct lu_env *env,
582                                   struct update_ops *ops,
583                                   unsigned int *op_count,
584                                   size_t *max_ops_size,
585                                   struct update_params *params,
586                                   unsigned int *param_count,
587                                   size_t *max_param_size,
588                                   const struct lu_fid *fid,
589                                   const struct lu_buf *buf, const char *name,
590                                   __u32 flag);
591 int update_records_xattr_del_pack(const struct lu_env *env,
592                                   struct update_ops *ops,
593                                   unsigned int *op_count,
594                                   size_t *max_ops_size,
595                                   struct update_params *params,
596                                   unsigned int *param_count,
597                                   size_t *max_param_size,
598                                   const struct lu_fid *fid,
599                                   const char *name);
600 int update_records_write_pack(const struct lu_env *env,
601                               struct update_ops *ops,
602                               unsigned int *op_count,
603                               size_t *max_ops_size,
604                               struct update_params *params,
605                               unsigned int *param_count,
606                               size_t *max_param_size,
607                               const struct lu_fid *fid,
608                               const struct lu_buf *buf,
609                               __u64 pos);
610 int update_records_punch_pack(const struct lu_env *env,
611                               struct update_ops *ops,
612                               unsigned int *op_count,
613                               size_t *max_ops_size,
614                               struct update_params *params,
615                               unsigned int *param_count,
616                               size_t *max_param_size,
617                               const struct lu_fid *fid,
618                               __u64 start, __u64 end);
619
620 int tur_update_records_extend(struct thandle_update_records *tur,
621                               size_t new_size);
622 int tur_update_params_extend(struct thandle_update_records *tur,
623                              size_t new_size);
624 int tur_update_extend(struct thandle_update_records *tur,
625                       size_t new_op_size, size_t new_param_size);
626
627 #define update_record_pack(name, th, ...)                               \
628 ({                                                                      \
629         struct top_thandle *top_th;                                     \
630         struct top_multiple_thandle *tmt;                               \
631         struct thandle_update_records *tur;                             \
632         struct llog_update_record     *lur;                             \
633         size_t          avail_param_size;                               \
634         size_t          avail_op_size;                                  \
635         int             ret;                                            \
636                                                                         \
637         while (1) {                                                     \
638                 top_th = container_of(th, struct top_thandle, tt_super);\
639                 tmt = top_th->tt_multiple_thandle;                      \
640                 tur = tmt->tmt_update_records;                          \
641                 lur = tur->tur_update_records;                          \
642                 avail_param_size = tur->tur_update_params_buf_size -    \
643                              update_params_size(tur->tur_update_params, \
644                                         tur->tur_update_param_count);   \
645                 avail_op_size = tur->tur_update_records_buf_size -      \
646                                 llog_update_record_size(lur);           \
647                 ret = update_records_##name##_pack(env,                 \
648                                           &lur->lur_update_rec.ur_ops,  \
649                                   &lur->lur_update_rec.ur_update_count, \
650                                   &avail_op_size,                       \
651                                   tur->tur_update_params,               \
652                                   &tur->tur_update_param_count,         \
653                                   &avail_param_size, __VA_ARGS__);      \
654                 if (ret == -E2BIG) {                                    \
655                         ret = tur_update_extend(tur, avail_op_size,     \
656                                                    avail_param_size);   \
657                         if (ret != 0)                                   \
658                                 break;                                  \
659                         continue;                                       \
660                 } else {                                                \
661                         break;                                          \
662                 }                                                       \
663         }                                                               \
664         ret;                                                            \
665 })
666 #endif