Whamcloud - gitweb
LU-3536 lod: record update for cross-MDT operation
[fs/lustre-release.git] / lustre / include / lustre_update.h
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.htm
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2014, Intel Corporation.
24  */
25 /*
26  * lustre/include/lustre_update.h
27  *
28  * Author: Di Wang <di.wang@intel.com>
29  */
30
31 #ifndef _LUSTRE_UPDATE_H
32 #define _LUSTRE_UPDATE_H
33 #include <lustre_net.h>
34 #include <dt_object.h>
35
36 #define OUT_UPDATE_INIT_BUFFER_SIZE     4096
37 #define OUT_UPDATE_REPLY_SIZE           8192
38
39 struct dt_key;
40 struct dt_rec;
41 struct object_update_param;
42
43 struct update_buffer {
44         struct object_update_request    *ub_req;
45         size_t                          ub_req_size;
46 };
47
48 /**
49  * Tracking the updates being executed on this dt_device.
50  */
51 struct dt_update_request {
52         struct dt_device                *dur_dt;
53         /* attached itself to thandle */
54         int                             dur_flags;
55         /* update request result */
56         int                             dur_rc;
57         /* Current batch(transaction) id */
58         __u64                           dur_batchid;
59         /* Holding object updates */
60         struct update_buffer            dur_buf;
61         struct list_head                dur_cb_items;
62 };
63
64 struct update_params {
65         struct object_update_param      up_params[0];
66 };
67
68 static inline size_t update_params_size(const struct update_params *params,
69                                         unsigned int param_count)
70 {
71         struct object_update_param      *param;
72         size_t total_size = sizeof(*params);
73         unsigned int i;
74
75         param = (struct object_update_param *)&params->up_params[0];
76         for (i = 0; i < param_count; i++) {
77                 size_t size = object_update_param_size(param);
78
79                 param = (struct object_update_param *)((char *)param + size);
80                 total_size += size;
81         }
82
83         return total_size;
84 }
85
86 static inline struct object_update_param *
87 update_params_get_param(const struct update_params *params,
88                         unsigned int index, unsigned int param_count)
89 {
90         struct object_update_param *param;
91         unsigned int            i;
92
93         if (index > param_count)
94                 return NULL;
95
96         param = (struct object_update_param *)&params->up_params[0];
97         for (i = 0; i < index; i++)
98                 param = (struct object_update_param *)((char *)param +
99                         object_update_param_size(param));
100
101         return param;
102 }
103
104 struct update_op {
105         struct lu_fid uop_fid;
106         __u16   uop_type;
107         __u16   uop_param_count;
108         __u16   uop_params_off[0];
109 };
110
111 static inline size_t
112 update_op_size(unsigned int param_count)
113 {
114         return offsetof(struct update_op, uop_params_off[param_count]);
115 }
116
117 static inline struct update_op *
118 update_op_next_op(const struct update_op *uop)
119 {
120         return (struct update_op *)((char *)uop +
121                                 update_op_size(uop->uop_param_count));
122 }
123
124 /* All of updates in the mulitple_update_record */
125 struct update_ops {
126         struct update_op        uops_op[0];
127 };
128
129 static inline size_t update_ops_size(const struct update_ops *ops,
130                                      unsigned int update_count)
131 {
132         struct update_op *op;
133         size_t total_size = sizeof(*ops);
134         unsigned int i;
135
136         op = (struct update_op *)&ops->uops_op[0];
137         for (i = 0; i < update_count; i++, op = update_op_next_op(op))
138                 total_size += update_op_size(op->uop_param_count);
139
140         return total_size;
141 }
142
143 /*
144  * This is the update record format used to store the updates in
145  * disk. All updates of the operation will be stored in ur_ops.
146  * All of parameters for updates of the operation will be stored
147  * in ur_params.
148  * To save the space of the record, parameters in ur_ops will only
149  * remember their offset in ur_params, so to avoid storing duplicate
150  * parameters in ur_params, which can help us save a lot space for
151  * operation like creating striped directory.
152  */
153 struct update_records {
154         __u64                   ur_master_transno;
155         __u64                   ur_batchid;
156         __u32                   ur_flags;
157         __u32                   ur_param_count;
158         __u32                   ur_update_count;
159         struct update_ops       ur_ops;
160          /* Note ur_ops has a variable size, so comment out
161           * the following ur_params, in case some use it directly
162           * update_records->ur_params
163           *
164           * struct update_params        ur_params;
165           */
166 };
167
168 struct llog_update_record {
169         struct llog_rec_hdr     lur_hdr;
170         struct update_records   lur_update_rec;
171         /* Note ur_update_rec has a variable size, so comment out
172          * the following ur_tail, in case someone use it directly
173          *
174          * struct llog_rec_tail lur_tail;
175          */
176 };
177
178 static inline struct update_params *
179 update_records_get_params(const struct update_records *record)
180 {
181         return (struct update_params *)((char *)record +
182                 offsetof(struct update_records, ur_ops) +
183                 update_ops_size(&record->ur_ops, record->ur_update_count));
184 }
185
186 static inline size_t
187 update_records_size(const struct update_records *record)
188 {
189         struct update_params *params;
190
191         params = update_records_get_params(record);
192
193         return cfs_size_round(offsetof(struct update_records, ur_ops) +
194                update_ops_size(&record->ur_ops, record->ur_update_count) +
195                update_params_size(params, record->ur_param_count));
196 }
197
198 static inline size_t
199 llog_update_record_size(const struct llog_update_record *lur)
200 {
201         return cfs_size_round(sizeof(lur->lur_hdr) +
202                               update_records_size(&lur->lur_update_rec) +
203                               sizeof(struct llog_rec_tail));
204 }
205
206 static inline struct update_op *
207 update_ops_get_op(const struct update_ops *ops, unsigned int index,
208                   unsigned int update_count)
209 {
210         struct update_op *op;
211         unsigned int i;
212
213         if (index > update_count)
214                 return NULL;
215
216         op = (struct update_op *)&ops->uops_op[0];
217         for (i = 0; i < index; i++)
218                 op = update_op_next_op(op);
219
220         return op;
221 }
222
223 static inline void
224 *object_update_param_get(const struct object_update *update, size_t index,
225                          size_t *size)
226 {
227         const struct    object_update_param *param;
228         size_t          i;
229
230         if (index >= update->ou_params_count)
231                 return ERR_PTR(-EINVAL);
232
233         param = &update->ou_params[0];
234         for (i = 0; i < index; i++)
235                 param = (struct object_update_param *)((char *)param +
236                         object_update_param_size(param));
237
238         if (size != NULL)
239                 *size = param->oup_len;
240
241         if (param->oup_len == 0)
242                 return NULL;
243
244         return (void *)&param->oup_buf[0];
245 }
246
247 static inline unsigned long
248 object_update_request_size(const struct object_update_request *our)
249 {
250         unsigned long   size;
251         size_t          i = 0;
252
253         size = offsetof(struct object_update_request, ourq_updates[0]);
254         for (i = 0; i < our->ourq_count; i++) {
255                 struct object_update *update;
256
257                 update = (struct object_update *)((char *)our + size);
258                 size += object_update_size(update);
259         }
260         return size;
261 }
262
263 static inline void
264 object_update_reply_init(struct object_update_reply *reply, size_t count)
265 {
266         reply->ourp_magic = UPDATE_REPLY_MAGIC;
267         reply->ourp_count = count;
268 }
269
270 static inline void
271 object_update_result_insert(struct object_update_reply *reply,
272                             void *data, size_t data_len, size_t index,
273                             int rc)
274 {
275         struct object_update_result *update_result;
276         char *ptr;
277
278         update_result = object_update_result_get(reply, index, NULL);
279         LASSERT(update_result != NULL);
280
281         update_result->our_rc = ptlrpc_status_hton(rc);
282         if (data_len > 0) {
283                 LASSERT(data != NULL);
284                 ptr = (char *)update_result +
285                         cfs_size_round(sizeof(struct object_update_reply));
286                 update_result->our_datalen = data_len;
287                 memcpy(ptr, data, data_len);
288         }
289
290         reply->ourp_lens[index] = cfs_size_round(data_len +
291                                         sizeof(struct object_update_result));
292 }
293
294 static inline int
295 object_update_result_data_get(const struct object_update_reply *reply,
296                               struct lu_buf *lbuf, size_t index)
297 {
298         struct object_update_result *update_result;
299         size_t size = 0;
300         int    result;
301
302         LASSERT(lbuf != NULL);
303         update_result = object_update_result_get(reply, index, &size);
304         if (update_result == NULL ||
305             size < cfs_size_round(sizeof(struct object_update_reply)) ||
306             update_result->our_datalen > size)
307                 RETURN(-EFAULT);
308
309         result = ptlrpc_status_ntoh(update_result->our_rc);
310         if (result < 0)
311                 return result;
312
313         lbuf->lb_buf = update_result->our_data;
314         lbuf->lb_len = update_result->our_datalen;
315
316         return 0;
317 }
318
319 /**
320  * Attached in the thandle to record the updates for distribute
321  * distribution.
322  */
323 struct thandle_update_records {
324         /* All of updates for the cross-MDT operation. */
325         struct llog_update_record       *tur_update_records;
326         size_t                          tur_update_records_buf_size;
327
328         /* All of parameters for the cross-MDT operation */
329         struct update_params    *tur_update_params;
330         unsigned int            tur_update_param_count;
331         size_t                  tur_update_params_buf_size;
332 };
333
334 #define TOP_THANDLE_MAGIC       0x20140917
335 /* {top,sub}_thandle are used to manage distributed transactions which
336  * include updates on several nodes. A top_handle represents the
337  * whole operation, and sub_thandle represents updates on each node. */
338 struct top_thandle {
339         struct thandle          tt_super;
340         __u32                   tt_magic;
341         atomic_t                tt_refcount;
342         /* The master sub transaction. */
343         struct thandle          *tt_master_sub_thandle;
344
345         /* Other sub thandle will be listed here. */
346         struct list_head        tt_sub_thandle_list;
347
348         /* All of update records will packed here */
349         struct thandle_update_records *tt_update_records;
350 };
351
352 struct sub_thandle {
353         /* point to the osd/osp_thandle */
354         struct thandle          *st_sub_th;
355
356         /* linked to top_thandle */
357         struct list_head        st_sub_list;
358
359         /* If this sub thandle is committed */
360         bool                    st_committed:1,
361                                 st_record_update:1;
362 };
363
364
365 /* target/out_lib.c */
366 int out_update_pack(const struct lu_env *env, struct object_update *update,
367                     size_t max_update_size, enum update_type op,
368                     const struct lu_fid *fid, unsigned int params_count,
369                     __u16 *param_sizes, const void **param_bufs);
370 int out_create_pack(const struct lu_env *env, struct object_update *update,
371                     size_t max_update_size, const struct lu_fid *fid,
372                     const struct lu_attr *attr, struct dt_allocation_hint *hint,
373                     struct dt_object_format *dof);
374 int out_object_destroy_pack(const struct lu_env *env,
375                             struct object_update *update,
376                             size_t max_update_size,
377                             const struct lu_fid *fid);
378 int out_index_delete_pack(const struct lu_env *env,
379                           struct object_update *update, size_t max_update_size,
380                           const struct lu_fid *fid, const struct dt_key *key);
381 int out_index_insert_pack(const struct lu_env *env,
382                           struct object_update *update, size_t max_update_size,
383                           const struct lu_fid *fid, const struct dt_rec *rec,
384                           const struct dt_key *key);
385 int out_xattr_set_pack(const struct lu_env *env,
386                        struct object_update *update, size_t max_update_size,
387                        const struct lu_fid *fid, const struct lu_buf *buf,
388                        const char *name, __u32 flag);
389 int out_xattr_del_pack(const struct lu_env *env,
390                        struct object_update *update, size_t max_update_size,
391                        const struct lu_fid *fid, const char *name);
392 int out_attr_set_pack(const struct lu_env *env,
393                       struct object_update *update, size_t max_update_size,
394                       const struct lu_fid *fid, const struct lu_attr *attr);
395 int out_ref_add_pack(const struct lu_env *env,
396                      struct object_update *update, size_t max_update_size,
397                      const struct lu_fid *fid);
398 int out_ref_del_pack(const struct lu_env *env,
399                      struct object_update *update, size_t max_update_size,
400                      const struct lu_fid *fid);
401 int out_write_pack(const struct lu_env *env,
402                    struct object_update *update, size_t max_update_size,
403                    const struct lu_fid *fid, const struct lu_buf *buf,
404                    __u64 pos);
405 int out_attr_get_pack(const struct lu_env *env,
406                       struct object_update *update, size_t max_update_size,
407                       const struct lu_fid *fid);
408 int out_index_lookup_pack(const struct lu_env *env,
409                           struct object_update *update, size_t max_update_size,
410                           const struct lu_fid *fid, struct dt_rec *rec,
411                           const struct dt_key *key);
412 int out_xattr_get_pack(const struct lu_env *env,
413                        struct object_update *update, size_t max_update_size,
414                        const struct lu_fid *fid, const char *name);
415
416 const char *update_op_str(__u16 opcode);
417
418 /* target/update_trans.c */
419 struct thandle *thandle_get_sub_by_dt(const struct lu_env *env,
420                                       struct thandle *th,
421                                       struct dt_device *sub_dt);
422
423 static inline struct thandle *
424 thandle_get_sub(const struct lu_env *env, struct thandle *th,
425                  const struct dt_object *sub_obj)
426 {
427         return thandle_get_sub_by_dt(env, th, lu2dt_dev(sub_obj->do_lu.lo_dev));
428 }
429
430 struct thandle *
431 top_trans_create(const struct lu_env *env, struct dt_device *master_dev);
432
433 int top_trans_start(const struct lu_env *env, struct dt_device *master_dev,
434                     struct thandle *th);
435
436 int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev,
437                    struct thandle *th);
438
439 void top_thandle_destroy(struct top_thandle *top_th);
440
441 /* update_records.c */
442 int update_records_create_pack(const struct lu_env *env,
443                                struct update_ops *ops,
444                                unsigned int *op_count,
445                                size_t *max_ops_size,
446                                struct update_params *params,
447                                unsigned int *param_count,
448                                size_t *max_param_size,
449                                const struct lu_fid *fid,
450                                const struct lu_attr *attr,
451                                const struct dt_allocation_hint *hint,
452                                struct dt_object_format *dof);
453 int update_records_attr_set_pack(const struct lu_env *env,
454                                  struct update_ops *ops,
455                                  unsigned int *op_count,
456                                  size_t *max_ops_size,
457                                  struct update_params *params,
458                                  unsigned int *param_count,
459                                  size_t *max_param_size,
460                                  const struct lu_fid *fid,
461                                  const struct lu_attr *attr);
462 int update_records_ref_add_pack(const struct lu_env *env,
463                                 struct update_ops *ops,
464                                 unsigned int *op_count,
465                                 size_t *max_ops_size,
466                                 struct update_params *params,
467                                 unsigned int *param_count,
468                                 size_t *max_param_size,
469                                 const struct lu_fid *fid);
470 int update_records_ref_del_pack(const struct lu_env *env,
471                                 struct update_ops *ops,
472                                 unsigned int *op_count,
473                                 size_t *max_ops_size,
474                                 struct update_params *params,
475                                 unsigned int *param_count,
476                                 size_t *max_param_size,
477                                 const struct lu_fid *fid);
478 int update_records_object_destroy_pack(const struct lu_env *env,
479                                        struct update_ops *ops,
480                                        unsigned int *op_count,
481                                        size_t *max_ops_size,
482                                        struct update_params *params,
483                                        unsigned int *param_count,
484                                        size_t *max_param_size,
485                                        const struct lu_fid *fid);
486 int update_records_index_insert_pack(const struct lu_env *env,
487                                      struct update_ops *ops,
488                                      unsigned int *op_count,
489                                      size_t *max_ops_size,
490                                      struct update_params *params,
491                                      unsigned int *param_count,
492                                      size_t *max_param_size,
493                                      const struct lu_fid *fid,
494                                      const struct dt_rec *rec,
495                                      const struct dt_key *key);
496 int update_records_index_delete_pack(const struct lu_env *env,
497                                      struct update_ops *ops,
498                                      unsigned int *op_count,
499                                      size_t *max_ops_size,
500                                      struct update_params *params,
501                                      unsigned int *param_count,
502                                      size_t *max_param_size,
503                                      const struct lu_fid *fid,
504                                      const struct dt_key *key);
505 int update_records_xattr_set_pack(const struct lu_env *env,
506                                   struct update_ops *ops,
507                                   unsigned int *op_count,
508                                   size_t *max_ops_size,
509                                   struct update_params *params,
510                                   unsigned int *param_count,
511                                   size_t *max_param_size,
512                                   const struct lu_fid *fid,
513                                   const struct lu_buf *buf, const char *name,
514                                   __u32 flag);
515 int update_records_xattr_del_pack(const struct lu_env *env,
516                                   struct update_ops *ops,
517                                   unsigned int *op_count,
518                                   size_t *max_ops_size,
519                                   struct update_params *params,
520                                   unsigned int *param_count,
521                                   size_t *max_param_size,
522                                   const struct lu_fid *fid,
523                                   const char *name);
524 int update_records_write_pack(const struct lu_env *env,
525                               struct update_ops *ops,
526                               unsigned int *op_count,
527                               size_t *max_ops_size,
528                               struct update_params *params,
529                               unsigned int *param_count,
530                               size_t *max_param_size,
531                               const struct lu_fid *fid,
532                               const struct lu_buf *buf,
533                               __u64 pos);
534
535 int tur_update_records_extend(struct thandle_update_records *tur,
536                               size_t new_size);
537 int tur_update_params_extend(struct thandle_update_records *tur,
538                              size_t new_size);
539 int check_and_prepare_update_record(const struct lu_env *env,
540                                     struct thandle *th);
541 int merge_params_updates_buf(const struct lu_env *env,
542                              struct thandle_update_records *tur);
543 int tur_update_extend(struct thandle_update_records *tur,
544                       size_t new_op_size, size_t new_param_size);
545
546 #define update_record_pack(name, th, ...)                               \
547 ({                                                                      \
548         struct top_thandle *top_th;                                     \
549         struct thandle_update_records *tur;                             \
550         struct llog_update_record     *lur;                             \
551         size_t          avail_param_size;                               \
552         size_t          avail_op_size;                                  \
553         int             ret;                                            \
554                                                                         \
555         while (1) {                                                     \
556                 top_th = container_of(th, struct top_thandle, tt_super);\
557                 tur = top_th->tt_update_records;                        \
558                 lur = tur->tur_update_records;                          \
559                 avail_param_size = tur->tur_update_params_buf_size -    \
560                              update_params_size(tur->tur_update_params, \
561                                         tur->tur_update_param_count);   \
562                 avail_op_size = tur->tur_update_records_buf_size -      \
563                                 llog_update_record_size(lur);           \
564                 ret = update_records_##name##_pack(env,                 \
565                                           &lur->lur_update_rec.ur_ops,  \
566                                   &lur->lur_update_rec.ur_update_count, \
567                                   &avail_op_size,                       \
568                                   tur->tur_update_params,               \
569                                   &tur->tur_update_param_count,         \
570                                   &avail_param_size, __VA_ARGS__);      \
571                 if (ret == -E2BIG) {                                    \
572                         ret = tur_update_extend(tur, avail_op_size,     \
573                                                    avail_param_size);   \
574                         if (ret != 0)                                   \
575                                 break;                                  \
576                         continue;                                       \
577                 } else {                                                \
578                         break;                                          \
579                 }                                                       \
580         }                                                               \
581         ret;                                                            \
582 })
583 #endif