Whamcloud - gitweb
LU-3536 lod: record update for cross-MDT operation
[fs/lustre-release.git] / lustre / target / update_records.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, Intel Corporation.
24  */
25
26 /*
27  * lustre/target/update_records.c
28  *
29  * This file implement the methods to pack updates as update records, which
30  * will be written to the disk as llog record, and might be used during
31  * recovery.
32  *
33  * For cross-MDT operation, all of updates of the operation needs to be
34  * recorded in the disk, then during recovery phase, the recovery thread
35  * will retrieve and redo these updates if it needed.
36  *
37  * See comments above struct update_records for the format of update_records.
38  *
39  * Author: Di Wang <di.wang@intel.com>
40  */
41 #define DEBUG_SUBSYSTEM S_CLASS
42
43 #include <lu_target.h>
44 #include <lustre_update.h>
45 #include <obd.h>
46 #include <obd_class.h>
47 #include "tgt_internal.h"
48
49 #define UPDATE_RECORDS_BUFFER_SIZE      8192
50 #define UPDATE_PARAMS_BUFFER_SIZE       8192
51 /**
52  * Dump update record.
53  *
54  * Dump all of updates in the update_records, mostly for debugging purpose.
55  *
56  * \param[in] records   update records to be dumpped
57  * \param[in] mask      debug level mask
58  * \param[in] dump_params if dump all of updates the updates.
59  *
60  */
61 void update_records_dump(const struct update_records *records,
62                          unsigned int mask, bool dump_updates)
63 {
64         const struct update_ops *ops;
65         const struct update_op  *op = NULL;
66         struct update_params    *params;
67         unsigned int            i;
68
69         ops = &records->ur_ops;
70         params = update_records_get_params(records);
71
72         CDEBUG(mask, "ops = %d params_count = %d\n", records->ur_update_count,
73                records->ur_param_count);
74
75         if (records->ur_update_count == 0)
76                 return;
77
78         if (!dump_updates)
79                 return;
80
81         op = &ops->uops_op[0];
82         for (i = 0; i < records->ur_update_count; i++) {
83                 unsigned int j;
84
85                 CDEBUG(mask, "update %dth "DFID" %s params_count = %hu\n", i,
86                        PFID(&op->uop_fid), update_op_str(op->uop_type),
87                        op->uop_param_count);
88
89                 for (j = 0;  j < op->uop_param_count; j++) {
90                         struct object_update_param *param;
91
92                         param = update_params_get_param(params,
93                                         (unsigned int)op->uop_params_off[j],
94                                         records->ur_param_count);
95
96                         LASSERT(param != NULL);
97                         CDEBUG(mask, "param = %p %dth off = %hu size = %hu\n",
98                                param, j, op->uop_params_off[j], param->oup_len);
99                 }
100
101                 op = update_op_next_op(op);
102         }
103 }
104
105 /**
106  * Pack parameters to update records
107  *
108  * Find and insert parameter to update records, if the parameter
109  * already exists in \a params, then just return the offset of this
110  * parameter, otherwise insert the parameter and return its offset
111  *
112  * \param[in] params    update params in which to insert parameter
113  * \param[in] new_param parameters to be inserted.
114  * \param[in] new_param_size    the size of \a new_param
115  *
116  * \retval              index inside \a params if parameter insertion
117  *                      succeeds.
118  * \retval              negative errno if it fails.
119  */
120 static unsigned int update_records_param_pack(struct update_params *params,
121                                               const void *new_param,
122                                               size_t new_param_size,
123                                               unsigned int *param_count)
124 {
125         struct object_update_param      *param;
126         unsigned int                    i;
127
128         for (i = 0; i < *param_count; i++) {
129                 struct object_update_param *param;
130
131                 param = update_params_get_param(params, i, *param_count);
132                 if ((new_param == NULL && param->oup_len == new_param_size) ||
133                     (param->oup_len == new_param_size &&
134                      memcmp(param->oup_buf, new_param, new_param_size) == 0))
135                         /* Found the parameter and return its index */
136                         return i;
137         }
138
139         param = (struct object_update_param *)((char *)params +
140                                 update_params_size(params, *param_count));
141
142         param->oup_len = new_param_size;
143         if (new_param != NULL)
144                 memcpy(param->oup_buf, new_param, new_param_size);
145
146         *param_count = *param_count + 1;
147
148         return *param_count - 1;
149 }
150
151 /**
152  * Pack update to update records
153  *
154  * Pack the update and its parameters to the update records. First it will
155  * insert parameters, get the offset of these parameter, then fill the
156  * update with these offset. If insertion exceed the maximum size of
157  * current update records, it will return -E2BIG here, and the caller might
158  * extend the update_record size \see lod_updates_pack.
159  *
160  * \param[in] env       execution environment
161  * \param[in] fid       FID of the update.
162  * \param[in] op_type   operation type of the update
163  * \param[in] ops       ur_ops in update records
164  * \param[in|out] op_count      pointer to the count of ops
165  * \param[in|out] max_op_size maximum size of the update
166  * \param[in] params    ur_params in update records
167  * \param[in|out] param_count   pointer to the count of params
168  * \param[in|out] max_param_size maximum size of the parameter
169  * \param[in] param_bufs        buffers of parameters
170  * \param[in] params_buf_count  the count of the parameter buffers
171  * \param[in] param_size        sizes of parameters
172  *
173  * \retval              0 if packing succeeds
174  * \retval              negative errno if packing fails
175  */
176 static int update_records_update_pack(const struct lu_env *env,
177                                       const struct lu_fid *fid,
178                                       enum update_type op_type,
179                                       struct update_ops *ops,
180                                       unsigned int *op_count,
181                                       size_t *max_op_size,
182                                       struct update_params *params,
183                                       unsigned int *param_count,
184                                       size_t *max_param_size,
185                                       unsigned int param_bufs_count,
186                                       const void **param_bufs,
187                                       size_t *param_sizes)
188 {
189         struct update_op        *op;
190         size_t                  total_param_sizes = 0;
191         int                     index;
192         unsigned int            i;
193
194         /* Check whether the packing exceeding the maximum update size */
195         if (unlikely(*max_op_size < update_op_size(param_bufs_count))) {
196                 CDEBUG(D_INFO, "max_op_size = %zu update_op = %zu\n",
197                        *max_op_size, update_op_size(param_bufs_count));
198                 *max_op_size = update_op_size(param_bufs_count);
199                 return -E2BIG;
200         }
201
202         for (i = 0; i < param_bufs_count; i++)
203                 total_param_sizes +=
204                         cfs_size_round(sizeof(struct object_update_param) +
205                                        param_sizes[i]);
206
207         /* Check whether the packing exceeding the maximum parameter size */
208         if (unlikely(*max_param_size < total_param_sizes)) {
209                 CDEBUG(D_INFO, "max_param_size = %zu params size = %zu\n",
210                        *max_param_size, total_param_sizes);
211
212                 *max_param_size = total_param_sizes;
213                 return -E2BIG;
214         }
215
216         op = update_ops_get_op(ops, *op_count, *op_count);
217         op->uop_fid = *fid;
218         op->uop_type = op_type;
219         op->uop_param_count = param_bufs_count;
220         for (i = 0; i < param_bufs_count; i++) {
221                 index = update_records_param_pack(params, param_bufs[i],
222                                                   param_sizes[i], param_count);
223                 if (index < 0)
224                         return index;
225
226                 CDEBUG(D_INFO, "%s %uth param offset = %d size = %zu\n",
227                        update_op_str(op_type), i, index, param_sizes[i]);
228
229                 op->uop_params_off[i] = index;
230         }
231         CDEBUG(D_INFO, "%huth "DFID" %s param_count = %u\n",
232                *op_count, PFID(fid), update_op_str(op_type), *param_count);
233
234         *op_count = *op_count + 1;
235
236         return 0;
237 }
238
239 /**
240  * Pack create update
241  *
242  * Pack create update into update records.
243  *
244  * \param[in] env       execution environment
245  * \param[in] ops       ur_ops in update records
246  * \param[in|out] op_count      pointer to the count of ops
247  * \param[in|out] max_op_size maximum size of the update
248  * \param[in] params    ur_params in update records
249  * \param[in|out] param_count   pointer to the count of params
250  * \param[in|out] max_param_size maximum size of the parameter
251  * \param[in] fid       FID of the object to be created
252  * \param[in] attr      attribute of the object to be created
253  * \param[in] hint      creation hint
254  * \param[in] dof       creation format information
255  *
256  * \retval              0 if packing succeeds.
257  * \retval              negative errno if packing fails.
258  */
259 int update_records_create_pack(const struct lu_env *env,
260                                struct update_ops *ops,
261                                unsigned int *op_count,
262                                size_t *max_ops_size,
263                                struct update_params *params,
264                                unsigned int *param_count,
265                                size_t *max_param_size,
266                                const struct lu_fid *fid,
267                                const struct lu_attr *attr,
268                                const struct dt_allocation_hint *hint,
269                                struct dt_object_format *dof)
270 {
271         size_t                  sizes[2];
272         const void              *bufs[2];
273         int                     buf_count = 0;
274         const struct lu_fid     *parent_fid = NULL;
275         struct lu_fid           tmp_fid;
276         int                     rc;
277         struct obdo             *obdo;
278
279         if (attr != NULL) {
280                 obdo = &update_env_info(env)->uti_obdo;
281                 obdo->o_valid = 0;
282                 obdo_from_la(obdo, attr, attr->la_valid);
283                 lustre_set_wire_obdo(NULL, obdo, obdo);
284                 bufs[buf_count] = obdo;
285                 sizes[buf_count] = sizeof(*obdo);
286                 buf_count++;
287         }
288
289         if (hint != NULL && hint->dah_parent != NULL) {
290                 parent_fid = lu_object_fid(&hint->dah_parent->do_lu);
291                 fid_cpu_to_le(&tmp_fid, parent_fid);
292                 bufs[buf_count] = &tmp_fid;
293                 sizes[buf_count] = sizeof(tmp_fid);
294                 buf_count++;
295         }
296
297         rc = update_records_update_pack(env, fid, OUT_CREATE, ops, op_count,
298                                         max_ops_size, params, param_count,
299                                         max_param_size, buf_count, bufs, sizes);
300         return rc;
301 }
302 EXPORT_SYMBOL(update_records_create_pack);
303
304 /**
305  * Pack attr set update
306  *
307  * Pack attr_set update into update records.
308  *
309  * \param[in] env       execution environment
310  * \param[in] ops       ur_ops in update records
311  * \param[in|out] op_count pointer to the count of ops
312  * \param[in|out] max_op_size maximum size of the update
313  * \param[in] params    ur_params in update records
314  * \param[in|out] param_count pointer to the count of params
315  * \param[in|out] max_param_size maximum size of the parameter
316  * \param[in] fid       FID of the object to set attr
317  * \param[in] attr      attribute of attr set
318  *
319  * \retval              0 if packing succeeds.
320  * \retval              negative errno if packing fails.
321  */
322 int update_records_attr_set_pack(const struct lu_env *env,
323                                  struct update_ops *ops,
324                                  unsigned int *op_count,
325                                  size_t *max_ops_size,
326                                  struct update_params *params,
327                                  unsigned int *param_count,
328                                  size_t *max_param_size,
329                                  const struct lu_fid *fid,
330                                  const struct lu_attr *attr)
331 {
332         struct obdo *obdo = &update_env_info(env)->uti_obdo;
333         size_t size = sizeof(*obdo);
334
335         obdo->o_valid = 0;
336         obdo_from_la(obdo, attr, attr->la_valid);
337         lustre_set_wire_obdo(NULL, obdo, obdo);
338         return update_records_update_pack(env, fid, OUT_ATTR_SET, ops, op_count,
339                                           max_ops_size, params, param_count,
340                                           max_param_size, 1,
341                                           (const void **)&obdo, &size);
342 }
343 EXPORT_SYMBOL(update_records_attr_set_pack);
344
345 /**
346  * Pack ref add update
347  *
348  * Pack ref add update into update records.
349  *
350  * \param[in] env       execution environment
351  * \param[in] ops       ur_ops in update records
352  * \param[in|out] op_count pointer to the count of ops
353  * \param[in|out] max_op_size maximum size of the update
354  * \param[in] params    ur_params in update records
355  * \param[in|out] param_count pointer to the count of params
356  * \param[in|out] max_param_size maximum size of the parameter
357  * \param[in] fid       FID of the object to add reference
358  *
359  * \retval              0 if packing succeeds.
360  * \retval              negative errno if packing fails.
361  */
362 int update_records_ref_add_pack(const struct lu_env *env,
363                                 struct update_ops *ops,
364                                 unsigned int *op_count,
365                                 size_t *max_ops_size,
366                                 struct update_params *params,
367                                 unsigned int *param_count,
368                                 size_t *max_param_size,
369                                 const struct lu_fid *fid)
370 {
371         return update_records_update_pack(env, fid, OUT_REF_ADD, ops, op_count,
372                                           max_ops_size, params, param_count,
373                                           max_param_size, 0, NULL, NULL);
374 }
375 EXPORT_SYMBOL(update_records_ref_add_pack);
376
377 /**
378  * Pack ref del update
379  *
380  * Pack ref del update into update records.
381  *
382  * \param[in] env       execution environment
383  * \param[in] ops       ur_ops in update records
384  * \param[in|out] op_count      pointer to the count of ops
385  * \param[in|out] max_op_size maximum size of the update
386  * \param[in] params    ur_params in update records
387  * \param[in] param_count       pointer to the count of params
388  * \param[in|out] max_param_size maximum size of the parameter
389  * \param[in] fid       FID of the object to delete reference
390  *
391  * \retval              0 if packing succeeds.
392  * \retval              negative errno if packing fails.
393  */
394 int update_records_ref_del_pack(const struct lu_env *env,
395                                 struct update_ops *ops,
396                                 unsigned int *op_count,
397                                 size_t *max_ops_size,
398                                 struct update_params *params,
399                                 unsigned int *param_count,
400                                 size_t *max_param_size,
401                                 const struct lu_fid *fid)
402 {
403         return update_records_update_pack(env, fid, OUT_REF_DEL, ops, op_count,
404                                           max_ops_size, params, param_count,
405                                           max_param_size, 0, NULL, NULL);
406 }
407 EXPORT_SYMBOL(update_records_ref_del_pack);
408
409 /**
410  * Pack object destroy update
411  *
412  * Pack object destroy update into update records.
413  *
414  * \param[in] env       execution environment
415  * \param[in] ops       ur_ops in update records
416  * \param[in|out] op_count pointer to the count of ops
417  * \param[in|out] max_op_size maximum size of the update
418  * \param[in] params    ur_params in update records
419  * \param[in|out] param_count   pointer to the count of params
420  * \param[in|out] max_param_size maximum size of the parameter
421  * \param[in] fid       FID of the object to delete reference
422  *
423  * \retval              0 if packing succeeds.
424  * \retval              negative errno if packing fails.
425  */
426 int update_records_object_destroy_pack(const struct lu_env *env,
427                                        struct update_ops *ops,
428                                        unsigned int *op_count,
429                                        size_t *max_ops_size,
430                                        struct update_params *params,
431                                        unsigned int *param_count,
432                                        size_t *max_param_size,
433                                        const struct lu_fid *fid)
434 {
435         return update_records_update_pack(env, fid, OUT_DESTROY, ops, op_count,
436                                           max_ops_size, params, param_count,
437                                           max_param_size, 0, NULL, NULL);
438 }
439 EXPORT_SYMBOL(update_records_object_destroy_pack);
440
441 /**
442  * Pack index insert update
443  *
444  * Pack index insert update into update records.
445  *
446  * \param[in] env       execution environment
447  * \param[in] ops       ur_ops in update records
448  * \param[in] op_count  pointer to the count of ops
449  * \param[in|out] max_op_size maximum size of the update
450  * \param[in] params    ur_params in update records
451  * \param[in] param_count       pointer to the count of params
452  * \param[in|out] max_param_size maximum size of the parameter
453  * \param[in] fid       FID of the object to insert index
454  * \param[in] rec       record of insertion
455  * \param[in] key       key of insertion
456  *
457  * \retval              0 if packing succeeds.
458  * \retval              negative errno if packing fails.
459  */
460 int update_records_index_insert_pack(const struct lu_env *env,
461                                      struct update_ops *ops,
462                                      unsigned int *op_count,
463                                      size_t *max_ops_size,
464                                      struct update_params *params,
465                                      unsigned int *param_count,
466                                      size_t *max_param_size,
467                                      const struct lu_fid *fid,
468                                      const struct dt_rec *rec,
469                                      const struct dt_key *key)
470 {
471         struct dt_insert_rec       *rec1 = (struct dt_insert_rec *)rec;
472         struct lu_fid              rec_fid;
473         __u32                      type = cpu_to_le32(rec1->rec_type);
474         size_t                     sizes[3] = { strlen((const char *)key) + 1,
475                                                 sizeof(rec_fid),
476                                                 sizeof(type) };
477         const void                 *bufs[3] = { key,
478                                                 &rec_fid,
479                                                 &type };
480
481         fid_cpu_to_le(&rec_fid, rec1->rec_fid);
482
483         return update_records_update_pack(env, fid, OUT_INDEX_INSERT, ops,
484                                           op_count, max_ops_size, params,
485                                           param_count, max_param_size,
486                                           3, bufs, sizes);
487 }
488 EXPORT_SYMBOL(update_records_index_insert_pack);
489
490 /**
491  * Pack index delete update
492  *
493  * Pack index delete update into update records.
494  *
495  * \param[in] env       execution environment
496  * \param[in] ops       ur_ops in update records
497  * \param[in|out] op_count      pointer to the count of ops
498  * \param[in|out] max_op_size maximum size of the update
499  * \param[in] params    ur_params in update records
500  * \param[in|ount] param_count  pointer to the count of params
501  * \param[in|out] max_param_size maximum size of the parameter
502  * \param[in] fid       FID of the object to delete index
503  * \param[in] key       key of deletion
504  *
505  * \retval              0 if packing succeeds.
506  * \retval              negative errno if packing fails.
507  */
508 int update_records_index_delete_pack(const struct lu_env *env,
509                                      struct update_ops *ops,
510                                      unsigned int *op_count,
511                                      size_t *max_ops_size,
512                                      struct update_params *params,
513                                      unsigned int *param_count,
514                                      size_t *max_param_size,
515                                      const struct lu_fid *fid,
516                                      const struct dt_key *key)
517 {
518         size_t size = strlen((const char *)key) + 1;
519
520         return update_records_update_pack(env, fid, OUT_INDEX_DELETE, ops,
521                                           op_count, max_ops_size, params,
522                                           param_count, max_param_size,
523                                           1, (const void **)&key, &size);
524 }
525 EXPORT_SYMBOL(update_records_index_delete_pack);
526
527 /**
528  * Pack xattr set update
529  *
530  * Pack xattr set update into update records.
531  *
532  * \param[in] env       execution environment
533  * \param[in] ops       ur_ops in update records
534  * \param[in|out] op_count      pointer to the count of ops
535  * \param[in|out] max_op_size maximum size of the update
536  * \param[in] params    ur_params in update records
537  * \param[in|out] param_count   pointer to the count of params
538  * \param[in|out] max_param_size maximum size of the parameter
539  * \param[in] fid       FID of the object to set xattr
540  * \param[in] buf       xattr to be set
541  * \param[in] name      name of the xattr
542  * \param[in] flag      flag for setting xattr
543  *
544  * \retval              0 if packing succeeds.
545  * \retval              negative errno if packing fails.
546  */
547 int update_records_xattr_set_pack(const struct lu_env *env,
548                                   struct update_ops *ops,
549                                   unsigned int *op_count,
550                                   size_t *max_ops_size,
551                                   struct update_params *params,
552                                   unsigned int *param_count,
553                                   size_t *max_param_size,
554                                   const struct lu_fid *fid,
555                                   const struct lu_buf *buf, const char *name,
556                                   __u32 flag)
557 {
558         size_t  sizes[3] = {strlen(name) + 1, buf->lb_len, sizeof(flag)};
559         const void *bufs[3] = {name, buf->lb_buf, &flag};
560
561         flag = cpu_to_le32(flag);
562
563         return update_records_update_pack(env, fid, OUT_XATTR_SET, ops,
564                                           op_count, max_ops_size, params,
565                                           param_count, max_param_size,
566                                           3, bufs, sizes);
567 }
568 EXPORT_SYMBOL(update_records_xattr_set_pack);
569
570 /**
571  * Pack xattr delete update
572  *
573  * Pack xattr delete update into update records.
574  *
575  * \param[in] env       execution environment
576  * \param[in] ops       ur_ops in update records
577  * \param[in|out] op_count      pointer to the count of ops
578  * \param[in|out] max_op_size maximum size of the update
579  * \param[in] params    ur_params in update records
580  * \param[in|out] param_count   pointer to the count of params
581  * \param[in|out] max_param_size maximum size of the parameter
582  * \param[in] fid       FID of the object to delete xattr
583  * \param[in] name      name of the xattr
584  *
585  * \retval              0 if packing succeeds.
586  * \retval              negative errno if packing fails.
587  */
588 int update_records_xattr_del_pack(const struct lu_env *env,
589                                   struct update_ops *ops,
590                                   unsigned int *op_count,
591                                   size_t *max_ops_size,
592                                   struct update_params *params,
593                                   unsigned int *param_count,
594                                   size_t *max_param_size,
595                                   const struct lu_fid *fid,
596                                   const char *name)
597 {
598         size_t  size = strlen(name) + 1;
599
600         return update_records_update_pack(env, fid, OUT_XATTR_DEL, ops,
601                                           op_count, max_ops_size, params,
602                                           param_count, max_param_size,
603                                           1, (const void **)&name, &size);
604 }
605 EXPORT_SYMBOL(update_records_xattr_del_pack);
606
607 /**
608  * Pack write update
609  *
610  * Pack write update into update records.
611  *
612  * \param[in] env       execution environment
613  * \param[in] ops       ur_ops in update records
614  * \param[in|out] op_count      pointer to the count of ops
615  * \param[in|out] max_op_size maximum size of the update
616  * \param[in] params    ur_params in update records
617  * \param[in|out] param_count   pointer to the count of params
618  * \param[in|out] max_param_size maximum size of the parameter
619  * \param[in] fid       FID of the object to write into
620  * \param[in] buf       buffer to write which includes an embedded size field
621  * \param[in] pos       offet in the object to start writing at
622  *
623  * \retval              0 if packing succeeds.
624  * \retval              negative errno if packing fails.
625  */
626 int update_records_write_pack(const struct lu_env *env,
627                               struct update_ops *ops,
628                               unsigned int *op_count,
629                               size_t *max_ops_size,
630                               struct update_params *params,
631                               unsigned int *param_count,
632                               size_t *max_param_size,
633                               const struct lu_fid *fid,
634                               const struct lu_buf *buf,
635                               __u64 pos)
636 {
637         size_t          sizes[2] = {buf->lb_len, sizeof(pos)};
638         const void      *bufs[2] = {buf->lb_buf, &pos};
639
640         pos = cpu_to_le64(pos);
641
642         return update_records_update_pack(env, fid, OUT_XATTR_DEL, ops,
643                                           op_count, max_ops_size, params,
644                                           param_count, max_param_size,
645                                           2, bufs, sizes);
646 }
647 EXPORT_SYMBOL(update_records_write_pack);
648
649 /**
650  * Create update records in thandle_update_records
651  *
652  * Allocate update_records for thandle_update_records, the initial size
653  * will be 4KB.
654  *
655  * \param[in] tur       thandle_update_records where update_records will be
656  *                      allocated
657  * \retval              0 if allocation succeeds.
658  * \retval              negative errno if allocation fails.
659  */
660 static int tur_update_records_create(struct thandle_update_records *tur)
661 {
662         if (tur->tur_update_records != NULL)
663                 return 0;
664
665         OBD_ALLOC_LARGE(tur->tur_update_records,
666                         UPDATE_RECORDS_BUFFER_SIZE);
667
668         if (tur->tur_update_records == NULL)
669                 return -ENOMEM;
670
671         tur->tur_update_records_buf_size = UPDATE_RECORDS_BUFFER_SIZE;
672
673         return 0;
674 }
675
676 /**
677  * Extend update records
678  *
679  * Extend update_records to the new size in thandle_update_records.
680  *
681  * \param[in] tur       thandle_update_records where update_records will be
682  *                      extended.
683  * \retval              0 if extension succeeds.
684  * \retval              negative errno if extension fails.
685  */
686 int tur_update_records_extend(struct thandle_update_records *tur,
687                               size_t new_size)
688 {
689         struct llog_update_record       *record;
690
691         OBD_ALLOC_LARGE(record, new_size);
692         if (record == NULL)
693                 return -ENOMEM;
694
695         if (tur->tur_update_records != NULL) {
696                 memcpy(record, tur->tur_update_records,
697                        tur->tur_update_records_buf_size);
698                 OBD_FREE_LARGE(tur->tur_update_records,
699                                tur->tur_update_records_buf_size);
700         }
701
702         tur->tur_update_records = record;
703         tur->tur_update_records_buf_size = new_size;
704
705         return 0;
706 }
707 EXPORT_SYMBOL(tur_update_records_extend);
708
709 /**
710  * Extend update records
711  *
712  * Extend update records in thandle to make sure it is able to hold
713  * the update with certain update_op and params size.
714  *
715  * \param [in] tur      thandle_update_records to be extend
716  * \param [in] new_op_size update_op size of the update record
717  * \param [in] new_param_size params size of the update record
718  *
719  * \retval              0 if the update_records is being extended.
720  * \retval              negative errno if the update_records is not being
721  *                      extended.
722  */
723 int tur_update_extend(struct thandle_update_records *tur,
724                       size_t new_op_size, size_t new_param_size)
725 {
726         size_t record_size;
727         size_t params_size;
728         size_t extend_size;
729         int rc;
730         ENTRY;
731
732         record_size = llog_update_record_size(tur->tur_update_records);
733         /* extend update records buffer */
734         if (new_op_size > (tur->tur_update_records_buf_size - record_size -
735                            sizeof(*tur->tur_update_records))) {
736                 extend_size = round_up(new_op_size, UPDATE_RECORDS_BUFFER_SIZE);
737                 rc = tur_update_records_extend(tur,
738                                 tur->tur_update_records_buf_size +
739                                 extend_size);
740                 if (rc != 0)
741                         RETURN(rc);
742         }
743
744         /* extend parameters buffer */
745         params_size = update_params_size(tur->tur_update_params,
746                                          tur->tur_update_param_count);
747         if (new_param_size > (tur->tur_update_params_buf_size -
748                               params_size)) {
749                 extend_size = round_up(new_param_size,
750                                        UPDATE_PARAMS_BUFFER_SIZE);
751                 rc = tur_update_params_extend(tur,
752                                 tur->tur_update_params_buf_size +
753                                 extend_size);
754                 if (rc != 0)
755                         RETURN(rc);
756         }
757
758         RETURN(0);
759 }
760 EXPORT_SYMBOL(tur_update_extend);
761
762 /**
763  * Create update params in thandle_update_records
764  *
765  * Allocate update_params for thandle_update_records, the initial size
766  * will be 4KB.
767  *
768  * \param[in] tur       thandle_update_records where update_params will be
769  *                      allocated
770  * \retval              0 if allocation succeeds.
771  * \retval              negative errno if allocation fails.
772  */
773 static int tur_update_params_create(struct thandle_update_records *tur)
774 {
775         if (tur->tur_update_params != NULL)
776                 return 0;
777
778         OBD_ALLOC_LARGE(tur->tur_update_params, UPDATE_PARAMS_BUFFER_SIZE);
779         if (tur->tur_update_params == NULL)
780                 return -ENOMEM;
781
782         tur->tur_update_params_buf_size = UPDATE_PARAMS_BUFFER_SIZE;
783         return 0;
784 }
785
786 /**
787  * Extend update params
788  *
789  * Extend update_params to the new size in thandle_update_records.
790  *
791  * \param[in] tur       thandle_update_records where update_params will be
792  *                      extended.
793  * \retval              0 if extension succeeds.
794  * \retval              negative errno if extension fails.
795  */
796 int tur_update_params_extend(struct thandle_update_records *tur,
797                              size_t new_size)
798 {
799         struct update_params    *params;
800
801         OBD_ALLOC_LARGE(params, new_size);
802         if (params == NULL)
803                 return -ENOMEM;
804
805         if (tur->tur_update_params != NULL) {
806                 memcpy(params, tur->tur_update_params,
807                        tur->tur_update_params_buf_size);
808                 OBD_FREE_LARGE(tur->tur_update_params,
809                                tur->tur_update_params_buf_size);
810         }
811
812         tur->tur_update_params = params;
813         tur->tur_update_params_buf_size = new_size;
814
815         return 0;
816 }
817 EXPORT_SYMBOL(tur_update_params_extend);
818
819 /**
820  * Check and prepare whether it needs to record update.
821  *
822  * Checks if the transaction needs to record updates, and if it
823  * does, then initialize the update record buffer in the transaction.
824  *
825  * \param[in] env       execution environment
826  * \param[in] th        transaction handle
827  *
828  * \retval              0 if updates recording succeeds.
829  * \retval              negative errno if updates recording fails.
830  */
831 int check_and_prepare_update_record(const struct lu_env *env,
832                                     struct thandle *th)
833 {
834         struct thandle_update_records   *tur;
835         struct llog_update_record       *lur;
836         struct top_thandle              *top_th;
837         struct sub_thandle              *lst;
838         int                             rc;
839         bool                            record_update = false;
840         ENTRY;
841
842         top_th = container_of(th, struct top_thandle, tt_super);
843         /* Check if it needs to record updates for this transaction */
844         list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) {
845                 if (lst->st_record_update) {
846                         record_update = true;
847                         break;
848                 }
849         }
850         if (!record_update)
851                 RETURN(0);
852
853         if (top_th->tt_update_records != NULL)
854                 RETURN(0);
855
856         tur = &update_env_info(env)->uti_tur;
857         if (tur->tur_update_records == NULL) {
858                 rc = tur_update_records_create(tur);
859                 if (rc < 0)
860                         RETURN(rc);
861         }
862
863         if (tur->tur_update_params == NULL) {
864                 rc = tur_update_params_create(tur);
865                 if (rc < 0)
866                         RETURN(rc);
867         }
868
869         lur = tur->tur_update_records;
870         lur->lur_update_rec.ur_update_count = 0;
871         lur->lur_update_rec.ur_param_count = 0;
872         lur->lur_update_rec.ur_master_transno = 0;
873         lur->lur_update_rec.ur_batchid = 0;
874         lur->lur_update_rec.ur_flags = 0;
875
876         tur->tur_update_param_count = 0;
877
878         top_th->tt_update_records = tur;
879
880         RETURN(0);
881 }
882 EXPORT_SYMBOL(check_and_prepare_update_record);
883
884 /**
885  * Merge params into the update records
886  *
887  * Merge params and ops into the update records attached to top th.
888  * During transaction execution phase, parameters and update ops
889  * are collected in two different buffers (see lod_updates_pack()),
890  * then in transaction stop, it needs to be merged them in one
891  * buffer, then being written into the update log.
892  *
893  * \param[in] env       execution environment
894  * \param[in] tur       thandle update records whose updates and
895  *                      parameters are merged
896  *
897  * \retval              0 if merging succeeds.
898  * \retval              negaitive errno if merging fails.
899  */
900 int merge_params_updates_buf(const struct lu_env *env,
901                              struct thandle_update_records *tur)
902 {
903         struct llog_update_record *lur;
904         struct update_params *params;
905         size_t params_size;
906         size_t record_size;
907
908         if (tur->tur_update_records == NULL ||
909             tur->tur_update_params == NULL)
910                 return 0;
911
912         lur = tur->tur_update_records;
913         /* Extends the update records buffer if needed */
914         params_size = update_params_size(tur->tur_update_params,
915                                          tur->tur_update_param_count);
916         record_size = llog_update_record_size(lur);
917         if (record_size + params_size > tur->tur_update_records_buf_size) {
918                 int rc;
919
920                 rc = tur_update_records_extend(tur, record_size + params_size);
921                 if (rc < 0)
922                         return rc;
923                 lur = tur->tur_update_records;
924         }
925
926         params = update_records_get_params(&lur->lur_update_rec);
927         memcpy(params, tur->tur_update_params, params_size);
928         lur->lur_update_rec.ur_param_count = tur->tur_update_param_count;
929         return 0;
930 }
931 EXPORT_SYMBOL(merge_params_updates_buf);
932
933 static void update_key_fini(const struct lu_context *ctx,
934                             struct lu_context_key *key, void *data)
935 {
936         struct update_thread_info *info = data;
937
938         if (info->uti_tur.tur_update_records != NULL)
939                 OBD_FREE_LARGE(info->uti_tur.tur_update_records,
940                                info->uti_tur.tur_update_records_buf_size);
941         if (info->uti_tur.tur_update_params != NULL)
942                 OBD_FREE_LARGE(info->uti_tur.tur_update_params,
943                                info->uti_tur.tur_update_params_buf_size);
944
945         OBD_FREE_PTR(info);
946 }
947
948 /* context key constructor/destructor: update_key_init, update_key_fini */
949 LU_KEY_INIT(update, struct update_thread_info);
950 /* context key: update_thread_key */
951 LU_CONTEXT_KEY_DEFINE(update, LCT_MD_THREAD | LCT_MG_THREAD |
952                               LCT_DT_THREAD | LCT_TX_HANDLE | LCT_LOCAL);
953 EXPORT_SYMBOL(update_thread_key);
954 LU_KEY_INIT_GENERIC(update);
955
956 void update_info_init(void)
957 {
958         update_key_init_generic(&update_thread_key, NULL);
959         lu_context_key_register(&update_thread_key);
960 }
961
962 void update_info_fini(void)
963 {
964         lu_context_key_degister(&update_thread_key);
965 }