Whamcloud - gitweb
b6e5cf4ff1d9d29eafc2aacdd599d85984ab2044
[fs/lustre-release.git] / lustre / osp / osp_trans.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_trans.c
27  *
28  *
29  * 1. OSP (Object Storage Proxy) transaction methods
30  *
31  * Implement OSP layer transaction related interfaces for the dt_device API
32  * dt_device_operations.
33  *
34  *
35  * 2. Handle asynchronous idempotent operations
36  *
37  * The OSP uses OUT (Object Unified Target) RPC to talk with other server
38  * (MDT or OST) for kinds of operations, such as create, unlink, insert,
39  * delete, lookup, set_(x)attr, get_(x)attr, and etc. To reduce the number
40  * of RPCs, we allow multiple operations to be packaged together in single
41  * OUT RPC.
42  *
43  * For the asynchronous idempotent operations, such as get_(x)attr, related
44  * RPCs will be inserted into an osp_device based shared asynchronous request
45  * queue - osp_device::opd_async_requests. When the queue is full, all the
46  * requests in the queue will be packaged into a single OUT RPC and given to
47  * the ptlrpcd daemon (for sending), then the queue is purged and other new
48  * requests can be inserted into it.
49  *
50  * When the asynchronous idempotent operation inserts the request into the
51  * shared queue, it will register an interpreter. When the packaged OUT RPC
52  * is replied (or failed to be sent out), all the registered interpreters
53  * will be called one by one to handle each own result.
54  *
55  *
56  * There are three kinds of transactions
57  *
58  * 1. Local transaction, all of updates of the transaction are in the local MDT.
59  * 2. Remote transaction, all of updates of the transaction are in one remote
60  * MDT, which only happens in LFSCK now.
61  * 3. Distribute transaction, updates for the transaction are in mulitple MDTs.
62  *
63  * Author: Di Wang <di.wang@intel.com>
64  * Author: Fan, Yong <fan.yong@intel.com>
65  */
66
67 #define DEBUG_SUBSYSTEM S_MDS
68
69 #include "osp_internal.h"
70
71 /**
72  * The argument for the interpreter callback of osp request.
73  */
74 struct osp_update_args {
75         struct dt_update_request *oaua_update;
76         atomic_t                 *oaua_count;
77         wait_queue_head_t        *oaua_waitq;
78         bool                      oaua_flow_control;
79 };
80
81 /**
82  * Call back for each update request.
83  */
84 struct osp_update_callback {
85         /* list in the dt_update_request::dur_cb_items */
86         struct list_head                 ouc_list;
87
88         /* The target of the async update request. */
89         struct osp_object               *ouc_obj;
90
91         /* The data used by or_interpreter. */
92         void                            *ouc_data;
93
94         /* The interpreter function called after the async request handled. */
95         osp_update_interpreter_t        ouc_interpreter;
96 };
97
98 static struct object_update_request *object_update_request_alloc(size_t size)
99 {
100         struct object_update_request *ourq;
101
102         OBD_ALLOC_LARGE(ourq, size);
103         if (ourq == NULL)
104                 return ERR_PTR(-ENOMEM);
105
106         ourq->ourq_magic = UPDATE_REQUEST_MAGIC;
107         ourq->ourq_count = 0;
108
109         return ourq;
110 }
111
112 static void object_update_request_free(struct object_update_request *ourq,
113                                        size_t ourq_size)
114 {
115         if (ourq != NULL)
116                 OBD_FREE_LARGE(ourq, ourq_size);
117 }
118
119 /**
120  * Allocate and initialize dt_update_request
121  *
122  * dt_update_request is being used to track updates being executed on
123  * this dt_device(OSD or OSP). The update buffer will be 4k initially,
124  * and increased if needed.
125  *
126  * \param [in] dt       dt device
127  *
128  * \retval              dt_update_request being allocated if succeed
129  * \retval              ERR_PTR(errno) if failed
130  */
131 struct dt_update_request *dt_update_request_create(struct dt_device *dt)
132 {
133         struct dt_update_request *dt_update;
134         struct object_update_request *ourq;
135
136         OBD_ALLOC_PTR(dt_update);
137         if (dt_update == NULL)
138                 return ERR_PTR(-ENOMEM);
139
140         ourq = object_update_request_alloc(OUT_UPDATE_INIT_BUFFER_SIZE);
141         if (IS_ERR(ourq)) {
142                 OBD_FREE_PTR(dt_update);
143                 return ERR_CAST(ourq);
144         }
145
146         dt_update->dur_buf.ub_req = ourq;
147         dt_update->dur_buf.ub_req_size = OUT_UPDATE_INIT_BUFFER_SIZE;
148
149         dt_update->dur_dt = dt;
150         dt_update->dur_batchid = 0;
151         INIT_LIST_HEAD(&dt_update->dur_cb_items);
152
153         return dt_update;
154 }
155
156 /**
157  * Destroy dt_update_request
158  *
159  * \param [in] dt_update        dt_update_request being destroyed
160  */
161 void dt_update_request_destroy(struct dt_update_request *dt_update)
162 {
163         if (dt_update == NULL)
164                 return;
165
166         object_update_request_free(dt_update->dur_buf.ub_req,
167                                    dt_update->dur_buf.ub_req_size);
168         OBD_FREE_PTR(dt_update);
169 }
170
171 static void
172 object_update_request_dump(const struct object_update_request *ourq,
173                            unsigned int mask)
174 {
175         unsigned int i;
176         size_t total_size = 0;
177
178         for (i = 0; i < ourq->ourq_count; i++) {
179                 struct object_update    *update;
180                 size_t                  size = 0;
181
182                 update = object_update_request_get(ourq, i, &size);
183                 LASSERT(update != NULL);
184                 CDEBUG(mask, "i = %u fid = "DFID" op = %s master = %u"
185                        "params = %d batchid = "LPU64" size = %zu\n",
186                        i, PFID(&update->ou_fid),
187                        update_op_str(update->ou_type),
188                        update->ou_master_index, update->ou_params_count,
189                        update->ou_batchid, size);
190
191                 total_size += size;
192         }
193
194         CDEBUG(mask, "updates = %p magic = %x count = %d size = %zu\n", ourq,
195                ourq->ourq_magic, ourq->ourq_count, total_size);
196 }
197
198 /**
199  * Allocate an osp request and initialize it with the given parameters.
200  *
201  * \param[in] obj               pointer to the operation target
202  * \param[in] data              pointer to the data used by the interpreter
203  * \param[in] interpreter       pointer to the interpreter function
204  *
205  * \retval                      pointer to the asychronous request
206  * \retval                      NULL if the allocation failed
207  */
208 static struct osp_update_callback *
209 osp_update_callback_init(struct osp_object *obj, void *data,
210                          osp_update_interpreter_t interpreter)
211 {
212         struct osp_update_callback *ouc;
213
214         OBD_ALLOC_PTR(ouc);
215         if (ouc == NULL)
216                 return NULL;
217
218         lu_object_get(osp2lu_obj(obj));
219         INIT_LIST_HEAD(&ouc->ouc_list);
220         ouc->ouc_obj = obj;
221         ouc->ouc_data = data;
222         ouc->ouc_interpreter = interpreter;
223
224         return ouc;
225 }
226
227 /**
228  * Destroy the osp_update_callback.
229  *
230  * \param[in] env       pointer to the thread context
231  * \param[in] ouc       pointer to osp_update_callback
232  */
233 static void osp_update_callback_fini(const struct lu_env *env,
234                                      struct osp_update_callback *ouc)
235 {
236         LASSERT(list_empty(&ouc->ouc_list));
237
238         lu_object_put(env, osp2lu_obj(ouc->ouc_obj));
239         OBD_FREE_PTR(ouc);
240 }
241
242 /**
243  * Interpret the packaged OUT RPC results.
244  *
245  * For every packaged sub-request, call its registered interpreter function.
246  * Then destroy the sub-request.
247  *
248  * \param[in] env       pointer to the thread context
249  * \param[in] req       pointer to the RPC
250  * \param[in] arg       pointer to data used by the interpreter
251  * \param[in] rc        the RPC return value
252  *
253  * \retval              0 for success
254  * \retval              negative error number on failure
255  */
256 static int osp_update_interpret(const struct lu_env *env,
257                                 struct ptlrpc_request *req, void *arg, int rc)
258 {
259         struct object_update_reply      *reply  = NULL;
260         struct osp_update_args          *oaua   = arg;
261         struct dt_update_request        *dt_update = oaua->oaua_update;
262         struct osp_update_callback      *ouc;
263         struct osp_update_callback      *next;
264         int                              count  = 0;
265         int                              index  = 0;
266         int                              rc1    = 0;
267
268         if (oaua->oaua_flow_control)
269                 obd_put_request_slot(
270                                 &dt2osp_dev(dt_update->dur_dt)->opd_obd->u.cli);
271
272         /* Unpack the results from the reply message. */
273         if (req->rq_repmsg != NULL) {
274                 reply = req_capsule_server_sized_get(&req->rq_pill,
275                                                      &RMF_OUT_UPDATE_REPLY,
276                                                      OUT_UPDATE_REPLY_SIZE);
277                 if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC)
278                         rc1 = -EPROTO;
279                 else
280                         count = reply->ourp_count;
281         } else {
282                 rc1 = rc;
283         }
284
285         list_for_each_entry_safe(ouc, next, &dt_update->dur_cb_items,
286                                  ouc_list) {
287                 list_del_init(&ouc->ouc_list);
288
289                 /* The peer may only have handled some requests (indicated
290                  * by the 'count') in the packaged OUT RPC, we can only get
291                  * results for the handled part. */
292                 if (index < count && reply->ourp_lens[index] > 0) {
293                         struct object_update_result *result;
294
295                         result = object_update_result_get(reply, index, NULL);
296                         if (result == NULL)
297                                 rc1 = -EPROTO;
298                         else
299                                 rc1 = result->our_rc;
300                 } else {
301                         rc1 = rc;
302                         if (unlikely(rc1 == 0))
303                                 rc1 = -EINVAL;
304                 }
305
306                 if (ouc->ouc_interpreter != NULL)
307                         ouc->ouc_interpreter(env, reply, req, ouc->ouc_obj,
308                                              ouc->ouc_data, index, rc1);
309
310                 osp_update_callback_fini(env, ouc);
311                 index++;
312         }
313
314         if (oaua->oaua_count != NULL && atomic_dec_and_test(oaua->oaua_count))
315                 wake_up_all(oaua->oaua_waitq);
316
317         dt_update_request_destroy(dt_update);
318
319         return 0;
320 }
321
322 /**
323  * Pack all the requests in the shared asynchronous idempotent request queue
324  * into a single OUT RPC that will be given to the background ptlrpcd daemon.
325  *
326  * \param[in] env       pointer to the thread context
327  * \param[in] osp       pointer to the OSP device
328  * \param[in] update    pointer to the shared queue
329  *
330  * \retval              0 for success
331  * \retval              negative error number on failure
332  */
333 int osp_unplug_async_request(const struct lu_env *env,
334                              struct osp_device *osp,
335                              struct dt_update_request *update)
336 {
337         struct osp_update_args  *args;
338         struct ptlrpc_request   *req = NULL;
339         int                      rc;
340
341         rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
342                                  update->dur_buf.ub_req, &req);
343         if (rc != 0) {
344                 struct osp_update_callback *ouc;
345                 struct osp_update_callback *next;
346
347                 list_for_each_entry_safe(ouc, next,
348                                          &update->dur_cb_items, ouc_list) {
349                         list_del_init(&ouc->ouc_list);
350                         if (ouc->ouc_interpreter != NULL)
351                                 ouc->ouc_interpreter(env, NULL, NULL,
352                                                      ouc->ouc_obj,
353                                                      ouc->ouc_data, 0, rc);
354                         osp_update_callback_fini(env, ouc);
355                 }
356                 dt_update_request_destroy(update);
357         } else {
358                 args = ptlrpc_req_async_args(req);
359                 args->oaua_update = update;
360                 args->oaua_count = NULL;
361                 args->oaua_waitq = NULL;
362                 args->oaua_flow_control = false;
363                 req->rq_interpret_reply = osp_update_interpret;
364                 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
365         }
366
367         return rc;
368 }
369
370 /**
371  * Find or create (if NOT exist or purged) the shared asynchronous idempotent
372  * request queue - osp_device::opd_async_requests.
373  *
374  * If the osp_device::opd_async_requests is not NULL, then return it directly;
375  * otherwise create new dt_update_request and attach it to opd_async_requests.
376  *
377  * \param[in] osp       pointer to the OSP device
378  *
379  * \retval              pointer to the shared queue
380  * \retval              negative error number on failure
381  */
382 static struct dt_update_request *
383 osp_find_or_create_async_update_request(struct osp_device *osp)
384 {
385         struct dt_update_request *update = osp->opd_async_requests;
386
387         if (update != NULL)
388                 return update;
389
390         update = dt_update_request_create(&osp->opd_dt_dev);
391         if (!IS_ERR(update))
392                 osp->opd_async_requests = update;
393
394         return update;
395 }
396
397 /**
398  * Insert an osp_update_callback into the dt_update_request.
399  *
400  * Insert an osp_update_callback to the dt_update_request. Usually each update
401  * in the dt_update_request will have one correspondent callback, and these
402  * callbacks will be called in rq_interpret_reply.
403  *
404  * \param[in] env               pointer to the thread context
405  * \param[in] obj               pointer to the operation target object
406  * \param[in] data              pointer to the data used by the interpreter
407  * \param[in] interpreter       pointer to the interpreter function
408  *
409  * \retval                      0 for success
410  * \retval                      negative error number on failure
411  */
412 int osp_insert_update_callback(const struct lu_env *env,
413                                struct dt_update_request *update,
414                                struct osp_object *obj, void *data,
415                                osp_update_interpreter_t interpreter)
416 {
417         struct osp_update_callback  *ouc;
418
419         ouc = osp_update_callback_init(obj, data, interpreter);
420         if (ouc == NULL)
421                 RETURN(-ENOMEM);
422
423         list_add_tail(&ouc->ouc_list, &update->dur_cb_items);
424
425         return 0;
426 }
427
428 /**
429  * Insert an asynchronous idempotent request to the shared request queue that
430  * is attached to the osp_device.
431  *
432  * This function generates a new osp_async_request with the given parameters,
433  * then tries to insert the request into the osp_device-based shared request
434  * queue. If the queue is full, then triggers the packaged OUT RPC to purge
435  * the shared queue firstly, and then re-tries.
436  *
437  * NOTE: must hold the osp::opd_async_requests_mutex to serialize concurrent
438  *       osp_insert_async_request call from others.
439  *
440  * \param[in] env               pointer to the thread context
441  * \param[in] op                operation type, see 'enum update_type'
442  * \param[in] obj               pointer to the operation target
443  * \param[in] count             array size of the subsequent \a lens and \a bufs
444  * \param[in] lens              buffer length array for the subsequent \a bufs
445  * \param[in] bufs              the buffers to compose the request
446  * \param[in] data              pointer to the data used by the interpreter
447  * \param[in] interpreter       pointer to the interpreter function
448  *
449  * \retval                      0 for success
450  * \retval                      negative error number on failure
451  */
452 int osp_insert_async_request(const struct lu_env *env, enum update_type op,
453                              struct osp_object *obj, int count,
454                              __u16 *lens, const void **bufs, void *data,
455                              osp_update_interpreter_t interpreter)
456 {
457         struct osp_device            *osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev);
458         struct dt_update_request        *update;
459         struct object_update            *object_update;
460         size_t                          max_update_size;
461         struct object_update_request    *ureq;
462         int                             rc = 0;
463         ENTRY;
464
465         update = osp_find_or_create_async_update_request(osp);
466         if (IS_ERR(update))
467                 RETURN(PTR_ERR(update));
468
469 again:
470         ureq = update->dur_buf.ub_req;
471         max_update_size = update->dur_buf.ub_req_size -
472                             object_update_request_size(ureq);
473
474         object_update = update_buffer_get_update(ureq, ureq->ourq_count);
475         rc = out_update_pack(env, object_update, max_update_size, op,
476                              lu_object_fid(osp2lu_obj(obj)), count, lens, bufs);
477         /* The queue is full. */
478         if (rc == -E2BIG) {
479                 osp->opd_async_requests = NULL;
480                 mutex_unlock(&osp->opd_async_requests_mutex);
481
482                 rc = osp_unplug_async_request(env, osp, update);
483                 mutex_lock(&osp->opd_async_requests_mutex);
484                 if (rc != 0)
485                         RETURN(rc);
486
487                 update = osp_find_or_create_async_update_request(osp);
488                 if (IS_ERR(update))
489                         RETURN(PTR_ERR(update));
490
491                 goto again;
492         } else {
493                 if (rc < 0)
494                         RETURN(rc);
495
496                 ureq->ourq_count++;
497         }
498
499         rc = osp_insert_update_callback(env, update, obj, data, interpreter);
500
501         RETURN(rc);
502 }
503
504 int osp_trans_update_request_create(struct thandle *th)
505 {
506         struct osp_thandle              *oth = thandle_to_osp_thandle(th);
507         struct dt_update_request        *update;
508
509         if (oth->ot_dur != NULL)
510                 return 0;
511
512         update = dt_update_request_create(th->th_dev);
513         if (IS_ERR(update)) {
514                 th->th_result = PTR_ERR(update);
515                 return PTR_ERR(update);
516         }
517
518         if (dt2osp_dev(th->th_dev)->opd_connect_mdt)
519                 update->dur_flags = UPDATE_FL_SYNC;
520
521         oth->ot_dur = update;
522         return 0;
523 }
524
525 static void osp_thandle_get(struct osp_thandle *oth)
526 {
527         atomic_inc(&oth->ot_refcount);
528 }
529
530 static void osp_thandle_put(struct osp_thandle *oth)
531 {
532         if (atomic_dec_and_test(&oth->ot_refcount))
533                 OBD_FREE_PTR(oth);
534 }
535
536 /**
537  * The OSP layer dt_device_operations::dt_trans_create() interface
538  * to create a transaction.
539  *
540  * There are two kinds of transactions that will involve OSP:
541  *
542  * 1) If the transaction only contains the updates on remote server
543  *    (MDT or OST), such as re-generating the lost OST-object for
544  *    LFSCK, then it is a remote transaction. For remote transaction,
545  *    the upper layer caller (such as the LFSCK engine) will call the
546  *    dt_trans_create() (with the OSP dt_device as the parameter),
547  *    then the call will be directed to the osp_trans_create() that
548  *    creates the transaction handler and returns it to the caller.
549  *
550  * 2) If the transcation contains both local and remote updates,
551  *    such as cross MDTs create under DNE mode, then the upper layer
552  *    caller will not trigger osp_trans_create(). Instead, it will
553  *    call dt_trans_create() on other dt_device, such as LOD that
554  *    will generate the transaction handler. Such handler will be
555  *    used by the whole transaction in subsequent sub-operations.
556  *
557  * \param[in] env       pointer to the thread context
558  * \param[in] d         pointer to the OSP dt_device
559  *
560  * \retval              pointer to the transaction handler
561  * \retval              negative error number on failure
562  */
563 struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
564 {
565         struct osp_thandle              *oth;
566         struct thandle                  *th = NULL;
567         ENTRY;
568
569         OBD_ALLOC_PTR(oth);
570         if (unlikely(oth == NULL))
571                 RETURN(ERR_PTR(-ENOMEM));
572
573         th = &oth->ot_super;
574         th->th_dev = d;
575         th->th_tags = LCT_TX_HANDLE;
576
577         atomic_set(&oth->ot_refcount, 1);
578         INIT_LIST_HEAD(&oth->ot_dcb_list);
579
580         RETURN(th);
581 }
582
583 /**
584  * Prepare update request.
585  *
586  * Prepare OUT update ptlrpc request, and the request usually includes
587  * all of updates (stored in \param ureq) from one operation.
588  *
589  * \param[in] env      execution environment
590  * \param[in] imp      import on which ptlrpc request will be sent
591  * \param[in] ureq     hold all of updates which will be packed into the req
592  * \param[in] reqp     request to be created
593  *
594  * \retval             0 if preparation succeeds.
595  * \retval             negative errno if preparation fails.
596  */
597 int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
598                         const struct object_update_request *ureq,
599                         struct ptlrpc_request **reqp)
600 {
601         struct ptlrpc_request           *req;
602         struct object_update_request    *tmp;
603         int                             ureq_len;
604         int                             rc;
605         ENTRY;
606
607         object_update_request_dump(ureq, D_INFO);
608         req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
609         if (req == NULL)
610                 RETURN(-ENOMEM);
611
612         ureq_len = object_update_request_size(ureq);
613         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE, RCL_CLIENT,
614                              ureq_len);
615
616         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
617         if (rc != 0) {
618                 ptlrpc_req_finished(req);
619                 RETURN(rc);
620         }
621
622         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
623                             RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
624
625         tmp = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE);
626         memcpy(tmp, ureq, ureq_len);
627
628         ptlrpc_request_set_replen(req);
629         req->rq_request_portal = OUT_PORTAL;
630         req->rq_reply_portal = OSC_REPLY_PORTAL;
631         *reqp = req;
632
633         RETURN(rc);
634 }
635
636 /**
637 * Send update RPC.
638 *
639 * Send update request to the remote MDT synchronously.
640 *
641 * \param[in] env        execution environment
642 * \param[in] imp        import on which ptlrpc request will be sent
643 * \param[in] dt_update  hold all of updates which will be packed into the req
644 * \param[in] reqp       request to be created
645 *
646 * \retval               0 if RPC succeeds.
647 * \retval               negative errno if RPC fails.
648 */
649 int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
650                     struct dt_update_request *dt_update,
651                     struct ptlrpc_request **reqp)
652 {
653         struct obd_import       *imp = osp->opd_obd->u.cli.cl_import;
654         struct ptlrpc_request   *req = NULL;
655         int                     rc;
656         ENTRY;
657
658         rc = osp_prep_update_req(env, imp, dt_update->dur_buf.ub_req, &req);
659         if (rc != 0)
660                 RETURN(rc);
661
662         /* This will only be called with read-only update, and these updates
663          * might be used to retrieve update log during recovery process, so
664          * it will be allowed to send during recovery process */
665         req->rq_allow_replay = 1;
666
667         /* Note: some dt index api might return non-zero result here, like
668          * osd_index_ea_lookup, so we should only check rc < 0 here */
669         rc = ptlrpc_queue_wait(req);
670         if (rc < 0) {
671                 ptlrpc_req_finished(req);
672                 dt_update->dur_rc = rc;
673                 RETURN(rc);
674         }
675
676         if (reqp != NULL) {
677                 *reqp = req;
678                 RETURN(rc);
679         }
680
681         dt_update->dur_rc = rc;
682
683         ptlrpc_req_finished(req);
684
685         RETURN(rc);
686 }
687
688 /**
689  * Add commit callback to transaction.
690  *
691  * Add commit callback to the osp thandle, which will be called
692  * when the thandle is committed remotely.
693  *
694  * \param[in] th        the thandle
695  * \param[in] dcb       commit callback structure
696  *
697  * \retval              only return 0 for now.
698  */
699 int osp_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
700 {
701         struct osp_thandle *oth = thandle_to_osp_thandle(th);
702
703         LASSERT(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC);
704         LASSERT(&dcb->dcb_func != NULL);
705         list_add(&dcb->dcb_linkage, &oth->ot_dcb_list);
706
707         return 0;
708 }
709
710 static void osp_trans_commit_cb(struct osp_thandle *oth, int result)
711 {
712         struct dt_txn_commit_cb *dcb;
713         struct dt_txn_commit_cb *tmp;
714
715         LASSERT(atomic_read(&oth->ot_refcount) > 0);
716         /* call per-transaction callbacks if any */
717         list_for_each_entry_safe(dcb, tmp, &oth->ot_dcb_list,
718                                  dcb_linkage) {
719                 LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
720                          "commit callback entry: magic=%x name='%s'\n",
721                          dcb->dcb_magic, dcb->dcb_name);
722                 list_del_init(&dcb->dcb_linkage);
723                 dcb->dcb_func(NULL, &oth->ot_super, dcb, result);
724         }
725 }
726
727 static void osp_request_commit_cb(struct ptlrpc_request *req)
728 {
729         struct thandle *th = req->rq_cb_data;
730         struct osp_thandle *oth = thandle_to_osp_thandle(th);
731         __u64                   last_committed_transno = 0;
732         int                     result = req->rq_status;
733         ENTRY;
734
735         if (lustre_msg_get_last_committed(req->rq_repmsg))
736                 last_committed_transno =
737                         lustre_msg_get_last_committed(req->rq_repmsg);
738
739         if (last_committed_transno <
740                 req->rq_import->imp_peer_committed_transno)
741                 last_committed_transno =
742                         req->rq_import->imp_peer_committed_transno;
743
744         CDEBUG(D_HA, "trans no "LPU64" committed transno "LPU64"\n",
745                req->rq_transno, last_committed_transno);
746
747         /* If the transaction is not really committed, mark result = 1 */
748         if (req->rq_transno != 0 &&
749             (req->rq_transno > last_committed_transno) && result == 0)
750                 result = 1;
751
752         osp_trans_commit_cb(oth, result);
753         req->rq_committed = 1;
754         osp_thandle_put(oth);
755         EXIT;
756 }
757
758 /**
759  * Trigger the request for remote updates.
760  *
761  * If th_sync is set, then the request will be sent synchronously,
762  * otherwise, the RPC will be sent asynchronously.
763  *
764  * Please refer to osp_trans_create() for transaction type.
765  *
766  * \param[in] env               pointer to the thread context
767  * \param[in] osp               pointer to the OSP device
768  * \param[in] dt_update         pointer to the dt_update_request
769  * \param[in] th                pointer to the transaction handler
770  * \param[out] sent             whether the RPC has been sent
771  *
772  * \retval                      0 for success
773  * \retval                      negative error number on failure
774  */
775 static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
776                              struct dt_update_request *dt_update,
777                              struct thandle *th, int *sent)
778 {
779         struct osp_update_args  *args;
780         struct ptlrpc_request   *req;
781         int     rc = 0;
782         ENTRY;
783
784         rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
785                                  dt_update->dur_buf.ub_req, &req);
786         if (rc != 0)
787                 RETURN(rc);
788
789         *sent = 1;
790         req->rq_interpret_reply = osp_update_interpret;
791         args = ptlrpc_req_async_args(req);
792         args->oaua_update = dt_update;
793
794         if (is_only_remote_trans(th) && !th->th_sync &&
795             !th->th_wait_submit) {
796                 args->oaua_flow_control = true;
797
798                 if (!osp->opd_connect_mdt) {
799                         down_read(&osp->opd_async_updates_rwsem);
800                         args->oaua_count = &osp->opd_async_updates_count;
801                         args->oaua_waitq = &osp->opd_syn_barrier_waitq;
802                         up_read(&osp->opd_async_updates_rwsem);
803                         atomic_inc(args->oaua_count);
804                 }
805
806                 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
807         } else {
808                 struct osp_thandle *oth = thandle_to_osp_thandle(th);
809                 struct lu_device *top_device;
810
811                 /* If the transaction is created during MDT recoverying
812                  * process, it means this is an recovery update, we need
813                  * to let OSP send it anyway without checking recoverying
814                  * status, in case the other target is being recoveried
815                  * at the same time, and if we wait here for the import
816                  * to be recoveryed, it might cause deadlock */
817                 top_device = osp->opd_dt_dev.dd_lu_dev.ld_site->ls_top_dev;
818                 if (top_device->ld_obd->obd_recovering)
819                         req->rq_allow_replay = 1;
820
821                 args->oaua_flow_control = false;
822                 req->rq_commit_cb = osp_request_commit_cb;
823                 req->rq_cb_data = th;
824                 osp_thandle_get(oth); /* for commit callback */
825                 osp_get_rpc_lock(osp);
826                 rc = ptlrpc_queue_wait(req);
827                 osp_put_rpc_lock(osp);
828                 if (req->rq_transno == 0 && !req->rq_committed)
829                         osp_thandle_put(oth);
830                 else
831                         oth->ot_dur = NULL;
832                 ptlrpc_req_finished(req);
833         }
834
835         RETURN(rc);
836 }
837
838 /**
839  * Get local thandle for osp_thandle
840  *
841  * Get the local OSD thandle from the OSP thandle. Currently, there
842  * are a few OSP API (osp_object_create() and osp_sync_add()) needs
843  * to update the object on local OSD device.
844  *
845  * If the osp_thandle comes from normal stack (MDD->LOD->OSP), then
846  * we will get local thandle by thandle_get_sub_by_dt.
847  *
848  * If the osp_thandle is remote thandle (th_top == NULL, only used
849  * by LFSCK), then it will create a local thandle, and stop it in
850  * osp_trans_stop(). And this only happens on OSP for OST.
851  *
852  * These are temporary solution, once OSP accessing OSD object is
853  * being fixed properly, this function should be removed. XXX
854  *
855  * \param[in] env               pointer to the thread context
856  * \param[in] th                pointer to the transaction handler
857  * \param[in] dt                pointer to the OSP device
858  *
859  * \retval                      pointer to the local thandle
860  * \retval                      ERR_PTR(errno) if it fails.
861  **/
862 struct thandle *osp_get_storage_thandle(const struct lu_env *env,
863                                         struct thandle *th,
864                                         struct osp_device *osp)
865 {
866         struct osp_thandle      *oth;
867         struct thandle          *local_th;
868
869         if (th->th_top != NULL)
870                 return thandle_get_sub_by_dt(env, th->th_top,
871                                              osp->opd_storage);
872
873         LASSERT(!osp->opd_connect_mdt);
874         oth = thandle_to_osp_thandle(th);
875         if (oth->ot_storage_th != NULL)
876                 return oth->ot_storage_th;
877
878         local_th = dt_trans_create(env, osp->opd_storage);
879         if (IS_ERR(local_th))
880                 return local_th;
881
882         oth->ot_storage_th = local_th;
883
884         return local_th;
885 }
886
887 /**
888  * The OSP layer dt_device_operations::dt_trans_start() interface
889  * to start the transaction.
890  *
891  * If the transaction is a remote transaction, then related remote
892  * updates will be triggered in the osp_trans_stop().
893  * Please refer to osp_trans_create() for transaction type.
894  *
895  * \param[in] env               pointer to the thread context
896  * \param[in] dt                pointer to the OSP dt_device
897  * \param[in] th                pointer to the transaction handler
898  *
899  * \retval                      0 for success
900  * \retval                      negative error number on failure
901  */
902 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
903                     struct thandle *th)
904 {
905         struct osp_thandle      *oth = thandle_to_osp_thandle(th);
906
907         /* For remote thandle, if there are local thandle, start it here*/
908         if (is_only_remote_trans(th) && oth->ot_storage_th != NULL)
909                 return dt_trans_start(env, oth->ot_storage_th->th_dev,
910                                       oth->ot_storage_th);
911         return 0;
912 }
913
914 /**
915  * The OSP layer dt_device_operations::dt_trans_stop() interface
916  * to stop the transaction.
917  *
918  * If the transaction is a remote transaction, related remote
919  * updates will be triggered here via osp_trans_trigger().
920  *
921  * For synchronous mode update or any failed update, the request
922  * will be destroyed explicitly when the osp_trans_stop().
923  *
924  * Please refer to osp_trans_create() for transaction type.
925  *
926  * \param[in] env               pointer to the thread context
927  * \param[in] dt                pointer to the OSP dt_device
928  * \param[in] th                pointer to the transaction handler
929  *
930  * \retval                      0 for success
931  * \retval                      negative error number on failure
932  */
933 int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
934                    struct thandle *th)
935 {
936         struct osp_thandle       *oth = thandle_to_osp_thandle(th);
937         struct dt_update_request *dt_update;
938         int                      rc = 0;
939         int                      sent = 0;
940         ENTRY;
941
942         /* For remote transaction, if there is local storage thandle,
943          * stop it first */
944         if (oth->ot_storage_th != NULL && th->th_top == NULL) {
945                 dt_trans_stop(env, oth->ot_storage_th->th_dev,
946                               oth->ot_storage_th);
947                 oth->ot_storage_th = NULL;
948         }
949
950         dt_update = oth->ot_dur;
951         if (dt_update == NULL || th->th_result != 0) {
952                 rc = th->th_result;
953                 GOTO(out, rc);
954         }
955
956         LASSERT(dt_update != LP_POISON);
957
958         /* If there are no updates, destroy dt_update and thandle */
959         if (dt_update->dur_buf.ub_req == NULL ||
960             dt_update->dur_buf.ub_req->ourq_count == 0)
961                 GOTO(out, rc);
962
963         if (is_only_remote_trans(th) && !th->th_sync &&
964             !th->th_wait_submit) {
965                 struct osp_device *osp = dt2osp_dev(th->th_dev);
966                 struct client_obd *cli = &osp->opd_obd->u.cli;
967
968                 rc = obd_get_request_slot(cli);
969                 if (rc != 0)
970                         GOTO(out, rc);
971
972                 if (!osp->opd_imp_active || !osp->opd_imp_connected) {
973                         obd_put_request_slot(cli);
974                         GOTO(out, rc = -ENOTCONN);
975                 }
976
977                 rc = osp_trans_trigger(env, dt2osp_dev(dt),
978                                        dt_update, th, &sent);
979                 if (rc != 0)
980                         obd_put_request_slot(cli);
981         } else {
982                 rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update,
983                                        th, &sent);
984         }
985
986 out:
987         /* If RPC is triggered successfully, dt_update will be freed in
988          * osp_update_interpreter() */
989         if (sent == 0) {
990                 struct osp_update_callback *ouc;
991                 struct osp_update_callback *next;
992
993                 if (dt_update != NULL) {
994                         list_for_each_entry_safe(ouc, next,
995                                                  &dt_update->dur_cb_items,
996                                                  ouc_list) {
997                                 list_del_init(&ouc->ouc_list);
998                                 if (ouc->ouc_interpreter != NULL)
999                                         ouc->ouc_interpreter(env, NULL, NULL,
1000                                                            ouc->ouc_obj,
1001                                                            ouc->ouc_data, 0,
1002                                                            rc);
1003                                 osp_update_callback_fini(env, ouc);
1004                         }
1005                 }
1006                 osp_trans_commit_cb(oth, rc);
1007                 dt_update_request_destroy(dt_update);
1008                 oth->ot_dur = NULL;
1009         }
1010
1011         osp_thandle_put(oth);
1012
1013         RETURN(rc);
1014 }