Whamcloud - gitweb
8a42fb8647cad8c4b8b54abbf22434f2887e275e
[fs/lustre-release.git] / lustre / include / lustre_update.h
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.htm
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2013, 2014, Intel Corporation.
24  */
25 /*
26  * lustre/include/lustre_update.h
27  *
28  * Author: Di Wang <di.wang@intel.com>
29  */
30
31 #ifndef _LUSTRE_UPDATE_H
32 #define _LUSTRE_UPDATE_H
33 #include <lustre_net.h>
34 #include <dt_object.h>
35
36 #define OUT_UPDATE_INIT_BUFFER_SIZE     4096
37 /* 16KB, the current biggest size is llog header(8KB) */
38 #define OUT_UPDATE_REPLY_SIZE           16384
39
40 struct dt_key;
41 struct dt_rec;
42 struct object_update_param;
43
44 struct update_buffer {
45         struct object_update_request    *ub_req;
46         size_t                          ub_req_size;
47 };
48
49 /**
50  * Tracking the updates being executed on this dt_device.
51  */
52 struct dt_update_request {
53         struct dt_device                *dur_dt;
54         /* attached itself to thandle */
55         int                             dur_flags;
56         /* update request result */
57         int                             dur_rc;
58         /* Current batch(transaction) id */
59         __u64                           dur_batchid;
60         /* Holding object updates */
61         struct update_buffer            dur_buf;
62         struct list_head                dur_cb_items;
63 };
64
65 struct update_params {
66         struct object_update_param      up_params[0];
67 };
68
69 static inline size_t update_params_size(const struct update_params *params,
70                                         unsigned int param_count)
71 {
72         struct object_update_param      *param;
73         size_t total_size = sizeof(*params);
74         unsigned int i;
75
76         param = (struct object_update_param *)&params->up_params[0];
77         for (i = 0; i < param_count; i++) {
78                 size_t size = object_update_param_size(param);
79
80                 param = (struct object_update_param *)((char *)param + size);
81                 total_size += size;
82         }
83
84         return total_size;
85 }
86
87 static inline struct object_update_param *
88 update_params_get_param(const struct update_params *params,
89                         unsigned int index, unsigned int param_count)
90 {
91         struct object_update_param *param;
92         unsigned int            i;
93
94         if (index > param_count)
95                 return NULL;
96
97         param = (struct object_update_param *)&params->up_params[0];
98         for (i = 0; i < index; i++)
99                 param = (struct object_update_param *)((char *)param +
100                         object_update_param_size(param));
101
102         return param;
103 }
104
105 struct update_op {
106         struct lu_fid uop_fid;
107         __u16   uop_type;
108         __u16   uop_param_count;
109         __u16   uop_params_off[0];
110 };
111
112 static inline size_t
113 update_op_size(unsigned int param_count)
114 {
115         return offsetof(struct update_op, uop_params_off[param_count]);
116 }
117
118 static inline struct update_op *
119 update_op_next_op(const struct update_op *uop)
120 {
121         return (struct update_op *)((char *)uop +
122                                 update_op_size(uop->uop_param_count));
123 }
124
125 /* All of updates in the mulitple_update_record */
126 struct update_ops {
127         struct update_op        uops_op[0];
128 };
129
130 static inline size_t update_ops_size(const struct update_ops *ops,
131                                      unsigned int update_count)
132 {
133         struct update_op *op;
134         size_t total_size = sizeof(*ops);
135         unsigned int i;
136
137         op = (struct update_op *)&ops->uops_op[0];
138         for (i = 0; i < update_count; i++, op = update_op_next_op(op))
139                 total_size += update_op_size(op->uop_param_count);
140
141         return total_size;
142 }
143
144 /*
145  * This is the update record format used to store the updates in
146  * disk. All updates of the operation will be stored in ur_ops.
147  * All of parameters for updates of the operation will be stored
148  * in ur_params.
149  * To save the space of the record, parameters in ur_ops will only
150  * remember their offset in ur_params, so to avoid storing duplicate
151  * parameters in ur_params, which can help us save a lot space for
152  * operation like creating striped directory.
153  */
154 struct update_records {
155         __u64                   ur_master_transno;
156         __u64                   ur_batchid;
157         __u32                   ur_flags;
158         __u32                   ur_param_count;
159         __u32                   ur_update_count;
160         struct update_ops       ur_ops;
161          /* Note ur_ops has a variable size, so comment out
162           * the following ur_params, in case some use it directly
163           * update_records->ur_params
164           *
165           * struct update_params        ur_params;
166           */
167 };
168
169 struct llog_update_record {
170         struct llog_rec_hdr     lur_hdr;
171         struct update_records   lur_update_rec;
172         /* Note ur_update_rec has a variable size, so comment out
173          * the following ur_tail, in case someone use it directly
174          *
175          * struct llog_rec_tail lur_tail;
176          */
177 };
178
179 static inline struct update_params *
180 update_records_get_params(const struct update_records *record)
181 {
182         return (struct update_params *)((char *)record +
183                 offsetof(struct update_records, ur_ops) +
184                 update_ops_size(&record->ur_ops, record->ur_update_count));
185 }
186
187 static inline size_t
188 update_records_size(const struct update_records *record)
189 {
190         struct update_params *params;
191
192         params = update_records_get_params(record);
193
194         return cfs_size_round(offsetof(struct update_records, ur_ops) +
195                update_ops_size(&record->ur_ops, record->ur_update_count) +
196                update_params_size(params, record->ur_param_count));
197 }
198
199 static inline size_t
200 llog_update_record_size(const struct llog_update_record *lur)
201 {
202         return cfs_size_round(sizeof(lur->lur_hdr) +
203                               update_records_size(&lur->lur_update_rec) +
204                               sizeof(struct llog_rec_tail));
205 }
206
207 static inline struct update_op *
208 update_ops_get_op(const struct update_ops *ops, unsigned int index,
209                   unsigned int update_count)
210 {
211         struct update_op *op;
212         unsigned int i;
213
214         if (index > update_count)
215                 return NULL;
216
217         op = (struct update_op *)&ops->uops_op[0];
218         for (i = 0; i < index; i++)
219                 op = update_op_next_op(op);
220
221         return op;
222 }
223
224 static inline void
225 *object_update_param_get(const struct object_update *update, size_t index,
226                          size_t *size)
227 {
228         const struct    object_update_param *param;
229         size_t          i;
230
231         if (index >= update->ou_params_count)
232                 return ERR_PTR(-EINVAL);
233
234         param = &update->ou_params[0];
235         for (i = 0; i < index; i++)
236                 param = (struct object_update_param *)((char *)param +
237                         object_update_param_size(param));
238
239         if (size != NULL)
240                 *size = param->oup_len;
241
242         if (param->oup_len == 0)
243                 return NULL;
244
245         return (void *)&param->oup_buf[0];
246 }
247
248 static inline unsigned long
249 object_update_request_size(const struct object_update_request *our)
250 {
251         unsigned long   size;
252         size_t          i = 0;
253
254         size = offsetof(struct object_update_request, ourq_updates[0]);
255         for (i = 0; i < our->ourq_count; i++) {
256                 struct object_update *update;
257
258                 update = (struct object_update *)((char *)our + size);
259                 size += object_update_size(update);
260         }
261         return size;
262 }
263
264 static inline void
265 object_update_reply_init(struct object_update_reply *reply, size_t count)
266 {
267         reply->ourp_magic = UPDATE_REPLY_MAGIC;
268         reply->ourp_count = count;
269 }
270
271 static inline void
272 object_update_result_insert(struct object_update_reply *reply,
273                             void *data, size_t data_len, size_t index,
274                             int rc)
275 {
276         struct object_update_result *update_result;
277         char *ptr;
278
279         update_result = object_update_result_get(reply, index, NULL);
280         LASSERT(update_result != NULL);
281
282         update_result->our_rc = ptlrpc_status_hton(rc);
283         if (data_len > 0) {
284                 LASSERT(data != NULL);
285                 ptr = (char *)update_result +
286                         cfs_size_round(sizeof(struct object_update_reply));
287                 update_result->our_datalen = data_len;
288                 memcpy(ptr, data, data_len);
289         }
290
291         reply->ourp_lens[index] = cfs_size_round(data_len +
292                                         sizeof(struct object_update_result));
293 }
294
295 static inline int
296 object_update_result_data_get(const struct object_update_reply *reply,
297                               struct lu_buf *lbuf, size_t index)
298 {
299         struct object_update_result *update_result;
300         size_t size = 0;
301         int    result;
302
303         LASSERT(lbuf != NULL);
304         update_result = object_update_result_get(reply, index, &size);
305         if (update_result == NULL ||
306             size < cfs_size_round(sizeof(struct object_update_reply)) ||
307             update_result->our_datalen > size)
308                 RETURN(-EFAULT);
309
310         result = ptlrpc_status_ntoh(update_result->our_rc);
311         if (result < 0)
312                 return result;
313
314         lbuf->lb_buf = update_result->our_data;
315         lbuf->lb_len = update_result->our_datalen;
316
317         return 0;
318 }
319
320 /**
321  * Attached in the thandle to record the updates for distribute
322  * distribution.
323  */
324 struct thandle_update_records {
325         /* All of updates for the cross-MDT operation. */
326         struct llog_update_record       *tur_update_records;
327         size_t                          tur_update_records_buf_size;
328
329         /* All of parameters for the cross-MDT operation */
330         struct update_params    *tur_update_params;
331         unsigned int            tur_update_param_count;
332         size_t                  tur_update_params_buf_size;
333 };
334
335 #define TOP_THANDLE_MAGIC       0x20140917
336 struct top_multiple_thandle {
337         struct dt_device        *tmt_master_sub_dt;
338         atomic_t                tmt_refcount;
339         /* Other sub transactions will be listed here. */
340         struct list_head        tmt_sub_thandle_list;
341
342         struct list_head        tmt_commit_list;
343         /* All of update records will packed here */
344         struct thandle_update_records *tmt_update_records;
345
346         __u64                   tmt_batchid;
347         int                     tmt_result;
348         __u32                   tmt_magic;
349         __u32                   tmt_committed:1;
350 };
351
352 /* {top,sub}_thandle are used to manage distributed transactions which
353  * include updates on several nodes. A top_handle represents the
354  * whole operation, and sub_thandle represents updates on each node. */
355 struct top_thandle {
356         struct thandle          tt_super;
357         /* The master sub transaction. */
358         struct thandle          *tt_master_sub_thandle;
359
360         struct top_multiple_thandle *tt_multiple_thandle;
361 };
362
363 /* Sub thandle is used to track multiple sub thandles under one parent
364  * thandle */
365 struct sub_thandle {
366         struct thandle          *st_sub_th;
367         struct dt_device        *st_dt;
368         struct list_head        st_list;
369         struct llog_cookie      st_cookie;
370         struct dt_txn_commit_cb st_commit_dcb;
371         int                     st_result;
372
373         /* linked to top_thandle */
374         struct list_head        st_sub_list;
375
376         /* If this sub thandle is committed */
377         bool                    st_committed:1;
378 };
379
380 /* target/out_lib.c */
381 int out_update_pack(const struct lu_env *env, struct object_update *update,
382                     size_t max_update_size, enum update_type op,
383                     const struct lu_fid *fid, unsigned int params_count,
384                     __u16 *param_sizes, const void **param_bufs);
385 int out_create_pack(const struct lu_env *env, struct object_update *update,
386                     size_t max_update_size, const struct lu_fid *fid,
387                     const struct lu_attr *attr, struct dt_allocation_hint *hint,
388                     struct dt_object_format *dof);
389 int out_object_destroy_pack(const struct lu_env *env,
390                             struct object_update *update,
391                             size_t max_update_size,
392                             const struct lu_fid *fid);
393 int out_index_delete_pack(const struct lu_env *env,
394                           struct object_update *update, size_t max_update_size,
395                           const struct lu_fid *fid, const struct dt_key *key);
396 int out_index_insert_pack(const struct lu_env *env,
397                           struct object_update *update, size_t max_update_size,
398                           const struct lu_fid *fid, const struct dt_rec *rec,
399                           const struct dt_key *key);
400 int out_xattr_set_pack(const struct lu_env *env,
401                        struct object_update *update, size_t max_update_size,
402                        const struct lu_fid *fid, const struct lu_buf *buf,
403                        const char *name, __u32 flag);
404 int out_xattr_del_pack(const struct lu_env *env,
405                        struct object_update *update, size_t max_update_size,
406                        const struct lu_fid *fid, const char *name);
407 int out_attr_set_pack(const struct lu_env *env,
408                       struct object_update *update, size_t max_update_size,
409                       const struct lu_fid *fid, const struct lu_attr *attr);
410 int out_ref_add_pack(const struct lu_env *env,
411                      struct object_update *update, size_t max_update_size,
412                      const struct lu_fid *fid);
413 int out_ref_del_pack(const struct lu_env *env,
414                      struct object_update *update, size_t max_update_size,
415                      const struct lu_fid *fid);
416 int out_write_pack(const struct lu_env *env,
417                    struct object_update *update, size_t max_update_size,
418                    const struct lu_fid *fid, const struct lu_buf *buf,
419                    __u64 pos);
420 int out_attr_get_pack(const struct lu_env *env,
421                       struct object_update *update, size_t max_update_size,
422                       const struct lu_fid *fid);
423 int out_index_lookup_pack(const struct lu_env *env,
424                           struct object_update *update, size_t max_update_size,
425                           const struct lu_fid *fid, struct dt_rec *rec,
426                           const struct dt_key *key);
427 int out_xattr_get_pack(const struct lu_env *env,
428                        struct object_update *update, size_t max_update_size,
429                        const struct lu_fid *fid, const char *name);
430 int out_read_pack(const struct lu_env *env, struct object_update *update,
431                   size_t max_update_length, const struct lu_fid *fid,
432                   size_t size, loff_t pos);
433
434 const char *update_op_str(__u16 opcode);
435
436 /* target/update_trans.c */
437 struct thandle *thandle_get_sub_by_dt(const struct lu_env *env,
438                                       struct thandle *th,
439                                       struct dt_device *sub_dt);
440
441 static inline struct thandle *
442 thandle_get_sub(const struct lu_env *env, struct thandle *th,
443                  const struct dt_object *sub_obj)
444 {
445         return thandle_get_sub_by_dt(env, th, lu2dt_dev(sub_obj->do_lu.lo_dev));
446 }
447
448 struct thandle *
449 top_trans_create(const struct lu_env *env, struct dt_device *master_dev);
450 int top_trans_start(const struct lu_env *env, struct dt_device *master_dev,
451                     struct thandle *th);
452 int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev,
453                    struct thandle *th);
454 void top_multiple_thandle_destroy(struct top_multiple_thandle *tmt);
455
456 static inline void top_multiple_thandle_get(struct top_multiple_thandle *tmt)
457 {
458         atomic_inc(&tmt->tmt_refcount);
459 }
460
461 static inline void top_multiple_thandle_put(struct top_multiple_thandle *tmt)
462 {
463         if (atomic_dec_and_test(&tmt->tmt_refcount))
464                 top_multiple_thandle_destroy(tmt);
465 }
466
467 struct sub_thandle *lookup_sub_thandle(struct top_multiple_thandle *tmt,
468                                        struct dt_device *dt_dev);
469
470 /* update_records.c */
471 int update_records_create_pack(const struct lu_env *env,
472                                struct update_ops *ops,
473                                unsigned int *op_count,
474                                size_t *max_ops_size,
475                                struct update_params *params,
476                                unsigned int *param_count,
477                                size_t *max_param_size,
478                                const struct lu_fid *fid,
479                                const struct lu_attr *attr,
480                                const struct dt_allocation_hint *hint,
481                                struct dt_object_format *dof);
482 int update_records_attr_set_pack(const struct lu_env *env,
483                                  struct update_ops *ops,
484                                  unsigned int *op_count,
485                                  size_t *max_ops_size,
486                                  struct update_params *params,
487                                  unsigned int *param_count,
488                                  size_t *max_param_size,
489                                  const struct lu_fid *fid,
490                                  const struct lu_attr *attr);
491 int update_records_ref_add_pack(const struct lu_env *env,
492                                 struct update_ops *ops,
493                                 unsigned int *op_count,
494                                 size_t *max_ops_size,
495                                 struct update_params *params,
496                                 unsigned int *param_count,
497                                 size_t *max_param_size,
498                                 const struct lu_fid *fid);
499 int update_records_ref_del_pack(const struct lu_env *env,
500                                 struct update_ops *ops,
501                                 unsigned int *op_count,
502                                 size_t *max_ops_size,
503                                 struct update_params *params,
504                                 unsigned int *param_count,
505                                 size_t *max_param_size,
506                                 const struct lu_fid *fid);
507 int update_records_object_destroy_pack(const struct lu_env *env,
508                                        struct update_ops *ops,
509                                        unsigned int *op_count,
510                                        size_t *max_ops_size,
511                                        struct update_params *params,
512                                        unsigned int *param_count,
513                                        size_t *max_param_size,
514                                        const struct lu_fid *fid);
515 int update_records_index_insert_pack(const struct lu_env *env,
516                                      struct update_ops *ops,
517                                      unsigned int *op_count,
518                                      size_t *max_ops_size,
519                                      struct update_params *params,
520                                      unsigned int *param_count,
521                                      size_t *max_param_size,
522                                      const struct lu_fid *fid,
523                                      const struct dt_rec *rec,
524                                      const struct dt_key *key);
525 int update_records_index_delete_pack(const struct lu_env *env,
526                                      struct update_ops *ops,
527                                      unsigned int *op_count,
528                                      size_t *max_ops_size,
529                                      struct update_params *params,
530                                      unsigned int *param_count,
531                                      size_t *max_param_size,
532                                      const struct lu_fid *fid,
533                                      const struct dt_key *key);
534 int update_records_xattr_set_pack(const struct lu_env *env,
535                                   struct update_ops *ops,
536                                   unsigned int *op_count,
537                                   size_t *max_ops_size,
538                                   struct update_params *params,
539                                   unsigned int *param_count,
540                                   size_t *max_param_size,
541                                   const struct lu_fid *fid,
542                                   const struct lu_buf *buf, const char *name,
543                                   __u32 flag);
544 int update_records_xattr_del_pack(const struct lu_env *env,
545                                   struct update_ops *ops,
546                                   unsigned int *op_count,
547                                   size_t *max_ops_size,
548                                   struct update_params *params,
549                                   unsigned int *param_count,
550                                   size_t *max_param_size,
551                                   const struct lu_fid *fid,
552                                   const char *name);
553 int update_records_write_pack(const struct lu_env *env,
554                               struct update_ops *ops,
555                               unsigned int *op_count,
556                               size_t *max_ops_size,
557                               struct update_params *params,
558                               unsigned int *param_count,
559                               size_t *max_param_size,
560                               const struct lu_fid *fid,
561                               const struct lu_buf *buf,
562                               __u64 pos);
563 int update_records_punch_pack(const struct lu_env *env,
564                               struct update_ops *ops,
565                               unsigned int *op_count,
566                               size_t *max_ops_size,
567                               struct update_params *params,
568                               unsigned int *param_count,
569                               size_t *max_param_size,
570                               const struct lu_fid *fid,
571                               __u64 start, __u64 end);
572
573 int tur_update_records_extend(struct thandle_update_records *tur,
574                               size_t new_size);
575 int tur_update_params_extend(struct thandle_update_records *tur,
576                              size_t new_size);
577 int tur_update_extend(struct thandle_update_records *tur,
578                       size_t new_op_size, size_t new_param_size);
579
580 #define update_record_pack(name, th, ...)                               \
581 ({                                                                      \
582         struct top_thandle *top_th;                                     \
583         struct top_multiple_thandle *tmt;                               \
584         struct thandle_update_records *tur;                             \
585         struct llog_update_record     *lur;                             \
586         size_t          avail_param_size;                               \
587         size_t          avail_op_size;                                  \
588         int             ret;                                            \
589                                                                         \
590         while (1) {                                                     \
591                 top_th = container_of(th, struct top_thandle, tt_super);\
592                 tmt = top_th->tt_multiple_thandle;                      \
593                 tur = tmt->tmt_update_records;                          \
594                 lur = tur->tur_update_records;                          \
595                 avail_param_size = tur->tur_update_params_buf_size -    \
596                              update_params_size(tur->tur_update_params, \
597                                         tur->tur_update_param_count);   \
598                 avail_op_size = tur->tur_update_records_buf_size -      \
599                                 llog_update_record_size(lur);           \
600                 ret = update_records_##name##_pack(env,                 \
601                                           &lur->lur_update_rec.ur_ops,  \
602                                   &lur->lur_update_rec.ur_update_count, \
603                                   &avail_op_size,                       \
604                                   tur->tur_update_params,               \
605                                   &tur->tur_update_param_count,         \
606                                   &avail_param_size, __VA_ARGS__);      \
607                 if (ret == -E2BIG) {                                    \
608                         ret = tur_update_extend(tur, avail_op_size,     \
609                                                    avail_param_size);   \
610                         if (ret != 0)                                   \
611                                 break;                                  \
612                         continue;                                       \
613                 } else {                                                \
614                         break;                                          \
615                 }                                                       \
616         }                                                               \
617         ret;                                                            \
618 })
619 #endif