Whamcloud - gitweb
LU-3540 lod: update recovery thread
[fs/lustre-release.git] / lustre / include / lustre_update.h
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.htm
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2014, Intel Corporation.
24  */
25 /*
26  * lustre/include/lustre_update.h
27  *
28  * Author: Di Wang <di.wang@intel.com>
29  */
30
31 #ifndef _LUSTRE_UPDATE_H
32 #define _LUSTRE_UPDATE_H
33 #include <lustre_net.h>
34 #include <dt_object.h>
35
36 #define OUT_UPDATE_INIT_BUFFER_SIZE     4096
37 /* 16KB, the current biggest size is llog header(8KB) */
38 #define OUT_UPDATE_REPLY_SIZE           16384
39
40 struct dt_key;
41 struct dt_rec;
42 struct object_update_param;
43
44 struct update_buffer {
45         struct object_update_request    *ub_req;
46         size_t                          ub_req_size;
47 };
48
49 /**
50  * Tracking the updates being executed on this dt_device.
51  */
52 struct dt_update_request {
53         struct dt_device                *dur_dt;
54         /* attached itself to thandle */
55         int                             dur_flags;
56         /* update request result */
57         int                             dur_rc;
58         /* Current batch(transaction) id */
59         __u64                           dur_batchid;
60         /* Holding object updates */
61         struct update_buffer            dur_buf;
62         struct list_head                dur_cb_items;
63 };
64
65 struct update_params {
66         struct object_update_param      up_params[0];
67 };
68
69 static inline size_t update_params_size(const struct update_params *params,
70                                         unsigned int param_count)
71 {
72         struct object_update_param      *param;
73         size_t total_size = sizeof(*params);
74         unsigned int i;
75
76         param = (struct object_update_param *)&params->up_params[0];
77         for (i = 0; i < param_count; i++) {
78                 size_t size = object_update_param_size(param);
79
80                 param = (struct object_update_param *)((char *)param + size);
81                 total_size += size;
82         }
83
84         return total_size;
85 }
86
87 static inline struct object_update_param *
88 update_params_get_param(const struct update_params *params,
89                         unsigned int index, unsigned int param_count)
90 {
91         struct object_update_param *param;
92         unsigned int            i;
93
94         if (index > param_count)
95                 return NULL;
96
97         param = (struct object_update_param *)&params->up_params[0];
98         for (i = 0; i < index; i++)
99                 param = (struct object_update_param *)((char *)param +
100                         object_update_param_size(param));
101
102         return param;
103 }
104
105 static inline void*
106 update_params_get_param_buf(const struct update_params *params, __u16 index,
107                             unsigned int param_count, __u16 *size)
108 {
109         struct object_update_param *param;
110
111         param = update_params_get_param(params, (unsigned int)index,
112                                         param_count);
113         if (param == NULL)
114                 return NULL;
115
116         if (size != NULL)
117                 *size = param->oup_len;
118
119         return &param->oup_buf[0];
120 }
121
122 struct update_op {
123         struct lu_fid uop_fid;
124         __u16   uop_type;
125         __u16   uop_param_count;
126         __u16   uop_params_off[0];
127 };
128
129 static inline size_t
130 update_op_size(unsigned int param_count)
131 {
132         return offsetof(struct update_op, uop_params_off[param_count]);
133 }
134
135 static inline struct update_op *
136 update_op_next_op(const struct update_op *uop)
137 {
138         return (struct update_op *)((char *)uop +
139                                 update_op_size(uop->uop_param_count));
140 }
141
142 /* All of updates in the mulitple_update_record */
143 struct update_ops {
144         struct update_op        uops_op[0];
145 };
146
147 static inline size_t update_ops_size(const struct update_ops *ops,
148                                      unsigned int update_count)
149 {
150         struct update_op *op;
151         size_t total_size = sizeof(*ops);
152         unsigned int i;
153
154         op = (struct update_op *)&ops->uops_op[0];
155         for (i = 0; i < update_count; i++, op = update_op_next_op(op))
156                 total_size += update_op_size(op->uop_param_count);
157
158         return total_size;
159 }
160
161 /*
162  * This is the update record format used to store the updates in
163  * disk. All updates of the operation will be stored in ur_ops.
164  * All of parameters for updates of the operation will be stored
165  * in ur_params.
166  * To save the space of the record, parameters in ur_ops will only
167  * remember their offset in ur_params, so to avoid storing duplicate
168  * parameters in ur_params, which can help us save a lot space for
169  * operation like creating striped directory.
170  */
171 struct update_records {
172         __u64                   ur_master_transno;
173         __u64                   ur_batchid;
174         __u32                   ur_flags;
175         __u32                   ur_param_count;
176         __u32                   ur_update_count;
177         struct update_ops       ur_ops;
178          /* Note ur_ops has a variable size, so comment out
179           * the following ur_params, in case some use it directly
180           * update_records->ur_params
181           *
182           * struct update_params        ur_params;
183           */
184 };
185
186 struct llog_update_record {
187         struct llog_rec_hdr     lur_hdr;
188         struct update_records   lur_update_rec;
189         /* Note ur_update_rec has a variable size, so comment out
190          * the following ur_tail, in case someone use it directly
191          *
192          * struct llog_rec_tail lur_tail;
193          */
194 };
195
196 static inline struct update_params *
197 update_records_get_params(const struct update_records *record)
198 {
199         return (struct update_params *)((char *)record +
200                 offsetof(struct update_records, ur_ops) +
201                 update_ops_size(&record->ur_ops, record->ur_update_count));
202 }
203
204 static inline size_t
205 update_records_size(const struct update_records *record)
206 {
207         struct update_params *params;
208
209         params = update_records_get_params(record);
210
211         return cfs_size_round(offsetof(struct update_records, ur_ops) +
212                update_ops_size(&record->ur_ops, record->ur_update_count) +
213                update_params_size(params, record->ur_param_count));
214 }
215
216 static inline size_t
217 llog_update_record_size(const struct llog_update_record *lur)
218 {
219         return cfs_size_round(sizeof(lur->lur_hdr) +
220                               update_records_size(&lur->lur_update_rec) +
221                               sizeof(struct llog_rec_tail));
222 }
223
224 static inline struct update_op *
225 update_ops_get_op(const struct update_ops *ops, unsigned int index,
226                   unsigned int update_count)
227 {
228         struct update_op *op;
229         unsigned int i;
230
231         if (index > update_count)
232                 return NULL;
233
234         op = (struct update_op *)&ops->uops_op[0];
235         for (i = 0; i < index; i++)
236                 op = update_op_next_op(op);
237
238         return op;
239 }
240
241 static inline void
242 *object_update_param_get(const struct object_update *update, size_t index,
243                          size_t *size)
244 {
245         const struct    object_update_param *param;
246         size_t          i;
247
248         if (index >= update->ou_params_count)
249                 return ERR_PTR(-EINVAL);
250
251         param = &update->ou_params[0];
252         for (i = 0; i < index; i++)
253                 param = (struct object_update_param *)((char *)param +
254                         object_update_param_size(param));
255
256         if (size != NULL)
257                 *size = param->oup_len;
258
259         if (param->oup_len == 0)
260                 return NULL;
261
262         return (void *)&param->oup_buf[0];
263 }
264
265 static inline unsigned long
266 object_update_request_size(const struct object_update_request *our)
267 {
268         unsigned long   size;
269         size_t          i = 0;
270
271         size = offsetof(struct object_update_request, ourq_updates[0]);
272         for (i = 0; i < our->ourq_count; i++) {
273                 struct object_update *update;
274
275                 update = (struct object_update *)((char *)our + size);
276                 size += object_update_size(update);
277         }
278         return size;
279 }
280
281 static inline void
282 object_update_reply_init(struct object_update_reply *reply, size_t count)
283 {
284         reply->ourp_magic = UPDATE_REPLY_MAGIC;
285         reply->ourp_count = count;
286 }
287
288 static inline void
289 object_update_result_insert(struct object_update_reply *reply,
290                             void *data, size_t data_len, size_t index,
291                             int rc)
292 {
293         struct object_update_result *update_result;
294         char *ptr;
295
296         update_result = object_update_result_get(reply, index, NULL);
297         LASSERT(update_result != NULL);
298
299         update_result->our_rc = ptlrpc_status_hton(rc);
300         if (data_len > 0) {
301                 LASSERT(data != NULL);
302                 ptr = (char *)update_result +
303                         cfs_size_round(sizeof(struct object_update_reply));
304                 update_result->our_datalen = data_len;
305                 memcpy(ptr, data, data_len);
306         }
307
308         reply->ourp_lens[index] = cfs_size_round(data_len +
309                                         sizeof(struct object_update_result));
310 }
311
312 static inline int
313 object_update_result_data_get(const struct object_update_reply *reply,
314                               struct lu_buf *lbuf, size_t index)
315 {
316         struct object_update_result *update_result;
317         size_t size = 0;
318         int    result;
319
320         LASSERT(lbuf != NULL);
321         update_result = object_update_result_get(reply, index, &size);
322         if (update_result == NULL ||
323             size < cfs_size_round(sizeof(struct object_update_reply)) ||
324             update_result->our_datalen > size)
325                 RETURN(-EFAULT);
326
327         result = ptlrpc_status_ntoh(update_result->our_rc);
328         if (result < 0)
329                 return result;
330
331         lbuf->lb_buf = update_result->our_data;
332         lbuf->lb_len = update_result->our_datalen;
333
334         return 0;
335 }
336
337 /**
338  * Attached in the thandle to record the updates for distribute
339  * distribution.
340  */
341 struct thandle_update_records {
342         /* All of updates for the cross-MDT operation. */
343         struct llog_update_record       *tur_update_records;
344         size_t                          tur_update_records_buf_size;
345
346         /* All of parameters for the cross-MDT operation */
347         struct update_params    *tur_update_params;
348         unsigned int            tur_update_param_count;
349         size_t                  tur_update_params_buf_size;
350 };
351
352 #define TOP_THANDLE_MAGIC       0x20140917
353 struct top_multiple_thandle {
354         struct dt_device        *tmt_master_sub_dt;
355         atomic_t                tmt_refcount;
356         /* Other sub transactions will be listed here. */
357         struct list_head        tmt_sub_thandle_list;
358
359         struct list_head        tmt_commit_list;
360         /* All of update records will packed here */
361         struct thandle_update_records *tmt_update_records;
362
363         __u64                   tmt_batchid;
364         int                     tmt_result;
365         __u32                   tmt_magic;
366         __u32                   tmt_committed:1;
367 };
368
369 /* {top,sub}_thandle are used to manage distributed transactions which
370  * include updates on several nodes. A top_handle represents the
371  * whole operation, and sub_thandle represents updates on each node. */
372 struct top_thandle {
373         struct thandle          tt_super;
374         /* The master sub transaction. */
375         struct thandle          *tt_master_sub_thandle;
376
377         struct top_multiple_thandle *tt_multiple_thandle;
378 };
379
380 /* Sub thandle is used to track multiple sub thandles under one parent
381  * thandle */
382 struct sub_thandle {
383         struct thandle          *st_sub_th;
384         struct dt_device        *st_dt;
385         struct list_head        st_list;
386         struct llog_cookie      st_cookie;
387         struct dt_txn_commit_cb st_commit_dcb;
388         int                     st_result;
389
390         /* linked to top_thandle */
391         struct list_head        st_sub_list;
392
393         /* If this sub thandle is committed */
394         bool                    st_committed:1;
395 };
396
397 struct tx_arg;
398 typedef int (*tx_exec_func_t)(const struct lu_env *env, struct thandle *th,
399                               struct tx_arg *ta);
400
401 /* Structure for holding one update executation */
402 struct tx_arg {
403         tx_exec_func_t           exec_fn;
404         tx_exec_func_t           undo_fn;
405         struct dt_object        *object;
406         const char              *file;
407         struct object_update_reply *reply;
408         int                      line;
409         int                      index;
410         union {
411                 struct {
412                         struct dt_insert_rec     rec;
413                         const struct dt_key     *key;
414                 } insert;
415                 struct {
416                 } ref;
417                 struct {
418                         struct lu_attr   attr;
419                 } attr_set;
420                 struct {
421                         struct lu_buf    buf;
422                         const char      *name;
423                         int              flags;
424                         __u32            csum;
425                 } xattr_set;
426                 struct {
427                         struct lu_attr                  attr;
428                         struct dt_allocation_hint       hint;
429                         struct dt_object_format         dof;
430                         struct lu_fid                   fid;
431                 } create;
432                 struct {
433                         struct lu_buf   buf;
434                         loff_t          pos;
435                 } write;
436                 struct {
437                         struct ost_body     *body;
438                 } destroy;
439         } u;
440 };
441
442 /* Structure for holding all update executations of one transaction */
443 struct thandle_exec_args {
444         struct thandle          *ta_handle;
445         int                     ta_argno;   /* used args */
446         int                     ta_alloc_args; /* allocated args count */
447         struct tx_arg           **ta_args;
448 };
449
450 /* target/out_lib.c */
451 int out_update_pack(const struct lu_env *env, struct object_update *update,
452                     size_t max_update_size, enum update_type op,
453                     const struct lu_fid *fid, unsigned int params_count,
454                     __u16 *param_sizes, const void **param_bufs);
455 int out_create_pack(const struct lu_env *env, struct object_update *update,
456                     size_t max_update_size, const struct lu_fid *fid,
457                     const struct lu_attr *attr, struct dt_allocation_hint *hint,
458                     struct dt_object_format *dof);
459 int out_object_destroy_pack(const struct lu_env *env,
460                             struct object_update *update,
461                             size_t max_update_size,
462                             const struct lu_fid *fid);
463 int out_index_delete_pack(const struct lu_env *env,
464                           struct object_update *update, size_t max_update_size,
465                           const struct lu_fid *fid, const struct dt_key *key);
466 int out_index_insert_pack(const struct lu_env *env,
467                           struct object_update *update, size_t max_update_size,
468                           const struct lu_fid *fid, const struct dt_rec *rec,
469                           const struct dt_key *key);
470 int out_xattr_set_pack(const struct lu_env *env,
471                        struct object_update *update, size_t max_update_size,
472                        const struct lu_fid *fid, const struct lu_buf *buf,
473                        const char *name, __u32 flag);
474 int out_xattr_del_pack(const struct lu_env *env,
475                        struct object_update *update, size_t max_update_size,
476                        const struct lu_fid *fid, const char *name);
477 int out_attr_set_pack(const struct lu_env *env,
478                       struct object_update *update, size_t max_update_size,
479                       const struct lu_fid *fid, const struct lu_attr *attr);
480 int out_ref_add_pack(const struct lu_env *env,
481                      struct object_update *update, size_t max_update_size,
482                      const struct lu_fid *fid);
483 int out_ref_del_pack(const struct lu_env *env,
484                      struct object_update *update, size_t max_update_size,
485                      const struct lu_fid *fid);
486 int out_write_pack(const struct lu_env *env,
487                    struct object_update *update, size_t max_update_size,
488                    const struct lu_fid *fid, const struct lu_buf *buf,
489                    __u64 pos);
490 int out_attr_get_pack(const struct lu_env *env,
491                       struct object_update *update, size_t max_update_size,
492                       const struct lu_fid *fid);
493 int out_index_lookup_pack(const struct lu_env *env,
494                           struct object_update *update, size_t max_update_size,
495                           const struct lu_fid *fid, struct dt_rec *rec,
496                           const struct dt_key *key);
497 int out_xattr_get_pack(const struct lu_env *env,
498                        struct object_update *update, size_t max_update_size,
499                        const struct lu_fid *fid, const char *name);
500 int out_read_pack(const struct lu_env *env, struct object_update *update,
501                   size_t max_update_length, const struct lu_fid *fid,
502                   size_t size, loff_t pos);
503
504 const char *update_op_str(__u16 opcode);
505
506 /* target/update_trans.c */
507 struct thandle *thandle_get_sub_by_dt(const struct lu_env *env,
508                                       struct thandle *th,
509                                       struct dt_device *sub_dt);
510
511 static inline struct thandle *
512 thandle_get_sub(const struct lu_env *env, struct thandle *th,
513                  const struct dt_object *sub_obj)
514 {
515         return thandle_get_sub_by_dt(env, th, lu2dt_dev(sub_obj->do_lu.lo_dev));
516 }
517
518 struct thandle *
519 top_trans_create(const struct lu_env *env, struct dt_device *master_dev);
520 int top_trans_start(const struct lu_env *env, struct dt_device *master_dev,
521                     struct thandle *th);
522 int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev,
523                    struct thandle *th);
524 void top_multiple_thandle_destroy(struct top_multiple_thandle *tmt);
525
526 static inline void top_multiple_thandle_get(struct top_multiple_thandle *tmt)
527 {
528         atomic_inc(&tmt->tmt_refcount);
529 }
530
531 static inline void top_multiple_thandle_put(struct top_multiple_thandle *tmt)
532 {
533         if (atomic_dec_and_test(&tmt->tmt_refcount))
534                 top_multiple_thandle_destroy(tmt);
535 }
536
537 struct sub_thandle *lookup_sub_thandle(struct top_multiple_thandle *tmt,
538                                        struct dt_device *dt_dev);
539 int sub_thandle_trans_create(const struct lu_env *env,
540                              struct top_thandle *top_th,
541                              struct sub_thandle *st);
542
543 /* update_records.c */
544 int update_records_create_pack(const struct lu_env *env,
545                                struct update_ops *ops,
546                                unsigned int *op_count,
547                                size_t *max_ops_size,
548                                struct update_params *params,
549                                unsigned int *param_count,
550                                size_t *max_param_size,
551                                const struct lu_fid *fid,
552                                const struct lu_attr *attr,
553                                const struct dt_allocation_hint *hint,
554                                struct dt_object_format *dof);
555 int update_records_attr_set_pack(const struct lu_env *env,
556                                  struct update_ops *ops,
557                                  unsigned int *op_count,
558                                  size_t *max_ops_size,
559                                  struct update_params *params,
560                                  unsigned int *param_count,
561                                  size_t *max_param_size,
562                                  const struct lu_fid *fid,
563                                  const struct lu_attr *attr);
564 int update_records_ref_add_pack(const struct lu_env *env,
565                                 struct update_ops *ops,
566                                 unsigned int *op_count,
567                                 size_t *max_ops_size,
568                                 struct update_params *params,
569                                 unsigned int *param_count,
570                                 size_t *max_param_size,
571                                 const struct lu_fid *fid);
572 int update_records_ref_del_pack(const struct lu_env *env,
573                                 struct update_ops *ops,
574                                 unsigned int *op_count,
575                                 size_t *max_ops_size,
576                                 struct update_params *params,
577                                 unsigned int *param_count,
578                                 size_t *max_param_size,
579                                 const struct lu_fid *fid);
580 int update_records_object_destroy_pack(const struct lu_env *env,
581                                        struct update_ops *ops,
582                                        unsigned int *op_count,
583                                        size_t *max_ops_size,
584                                        struct update_params *params,
585                                        unsigned int *param_count,
586                                        size_t *max_param_size,
587                                        const struct lu_fid *fid);
588 int update_records_index_insert_pack(const struct lu_env *env,
589                                      struct update_ops *ops,
590                                      unsigned int *op_count,
591                                      size_t *max_ops_size,
592                                      struct update_params *params,
593                                      unsigned int *param_count,
594                                      size_t *max_param_size,
595                                      const struct lu_fid *fid,
596                                      const struct dt_rec *rec,
597                                      const struct dt_key *key);
598 int update_records_index_delete_pack(const struct lu_env *env,
599                                      struct update_ops *ops,
600                                      unsigned int *op_count,
601                                      size_t *max_ops_size,
602                                      struct update_params *params,
603                                      unsigned int *param_count,
604                                      size_t *max_param_size,
605                                      const struct lu_fid *fid,
606                                      const struct dt_key *key);
607 int update_records_xattr_set_pack(const struct lu_env *env,
608                                   struct update_ops *ops,
609                                   unsigned int *op_count,
610                                   size_t *max_ops_size,
611                                   struct update_params *params,
612                                   unsigned int *param_count,
613                                   size_t *max_param_size,
614                                   const struct lu_fid *fid,
615                                   const struct lu_buf *buf, const char *name,
616                                   __u32 flag);
617 int update_records_xattr_del_pack(const struct lu_env *env,
618                                   struct update_ops *ops,
619                                   unsigned int *op_count,
620                                   size_t *max_ops_size,
621                                   struct update_params *params,
622                                   unsigned int *param_count,
623                                   size_t *max_param_size,
624                                   const struct lu_fid *fid,
625                                   const char *name);
626 int update_records_write_pack(const struct lu_env *env,
627                               struct update_ops *ops,
628                               unsigned int *op_count,
629                               size_t *max_ops_size,
630                               struct update_params *params,
631                               unsigned int *param_count,
632                               size_t *max_param_size,
633                               const struct lu_fid *fid,
634                               const struct lu_buf *buf,
635                               __u64 pos);
636 int update_records_punch_pack(const struct lu_env *env,
637                               struct update_ops *ops,
638                               unsigned int *op_count,
639                               size_t *max_ops_size,
640                               struct update_params *params,
641                               unsigned int *param_count,
642                               size_t *max_param_size,
643                               const struct lu_fid *fid,
644                               __u64 start, __u64 end);
645
646 int tur_update_records_extend(struct thandle_update_records *tur,
647                               size_t new_size);
648 int tur_update_params_extend(struct thandle_update_records *tur,
649                              size_t new_size);
650 int tur_update_extend(struct thandle_update_records *tur,
651                       size_t new_op_size, size_t new_param_size);
652
653 #define update_record_pack(name, th, ...)                               \
654 ({                                                                      \
655         struct top_thandle *top_th;                                     \
656         struct top_multiple_thandle *tmt;                               \
657         struct thandle_update_records *tur;                             \
658         struct llog_update_record     *lur;                             \
659         size_t          avail_param_size;                               \
660         size_t          avail_op_size;                                  \
661         int             ret;                                            \
662                                                                         \
663         while (1) {                                                     \
664                 top_th = container_of(th, struct top_thandle, tt_super);\
665                 tmt = top_th->tt_multiple_thandle;                      \
666                 tur = tmt->tmt_update_records;                          \
667                 lur = tur->tur_update_records;                          \
668                 avail_param_size = tur->tur_update_params_buf_size -    \
669                              update_params_size(tur->tur_update_params, \
670                                         tur->tur_update_param_count);   \
671                 avail_op_size = tur->tur_update_records_buf_size -      \
672                                 llog_update_record_size(lur);           \
673                 ret = update_records_##name##_pack(env,                 \
674                                           &lur->lur_update_rec.ur_ops,  \
675                                   &lur->lur_update_rec.ur_update_count, \
676                                   &avail_op_size,                       \
677                                   tur->tur_update_params,               \
678                                   &tur->tur_update_param_count,         \
679                                   &avail_param_size, __VA_ARGS__);      \
680                 if (ret == -E2BIG) {                                    \
681                         ret = tur_update_extend(tur, avail_op_size,     \
682                                                    avail_param_size);   \
683                         if (ret != 0)                                   \
684                                 break;                                  \
685                         continue;                                       \
686                 } else {                                                \
687                         break;                                          \
688                 }                                                       \
689         }                                                               \
690         ret;                                                            \
691 })
692 #endif