Whamcloud - gitweb
c5065723fafb5258e69a0e183dbf427e6f9ea778
[fs/lustre-release.git] / lustre / osp / osp_trans.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_trans.c
27  *
28  *
29  * 1. OSP (Object Storage Proxy) transaction methods
30  *
31  * Implement OSP layer transaction related interfaces for the dt_device API
32  * dt_device_operations.
33  *
34  *
35  * 2. Handle asynchronous idempotent operations
36  *
37  * The OSP uses OUT (Object Unified Target) RPC to talk with other server
38  * (MDT or OST) for kinds of operations, such as create, unlink, insert,
39  * delete, lookup, set_(x)attr, get_(x)attr, and etc. To reduce the number
40  * of RPCs, we allow multiple operations to be packaged together in single
41  * OUT RPC.
42  *
43  * For the asynchronous idempotent operations, such as get_(x)attr, related
44  * RPCs will be inserted into an osp_device based shared asynchronous request
45  * queue - osp_device::opd_async_requests. When the queue is full, all the
46  * requests in the queue will be packaged into a single OUT RPC and given to
47  * the ptlrpcd daemon (for sending), then the queue is purged and other new
48  * requests can be inserted into it.
49  *
50  * When the asynchronous idempotent operation inserts the request into the
51  * shared queue, it will register an interpreter. When the packaged OUT RPC
52  * is replied (or failed to be sent out), all the registered interpreters
53  * will be called one by one to handle each own result.
54  *
55  *
56  * There are three kinds of transactions
57  *
58  * 1. Local transaction, all of updates of the transaction are in the local MDT.
59  * 2. Remote transaction, all of updates of the transaction are in one remote
60  * MDT, which only happens in LFSCK now.
61  * 3. Distribute transaction, updates for the transaction are in mulitple MDTs.
62  *
63  * Author: Di Wang <di.wang@intel.com>
64  * Author: Fan, Yong <fan.yong@intel.com>
65  */
66
67 #define DEBUG_SUBSYSTEM S_MDS
68
69 #include <lustre_net.h>
70 #include "osp_internal.h"
71
72 /**
73  * The argument for the interpreter callback of osp request.
74  */
75 struct osp_update_args {
76         struct osp_update_request *oaua_update;
77         atomic_t                 *oaua_count;
78         wait_queue_head_t        *oaua_waitq;
79         bool                      oaua_flow_control;
80 };
81
82 /**
83  * Call back for each update request.
84  */
85 struct osp_update_callback {
86         /* list in the osp_update_request::our_cb_items */
87         struct list_head                 ouc_list;
88
89         /* The target of the async update request. */
90         struct osp_object               *ouc_obj;
91
92         /* The data used by or_interpreter. */
93         void                            *ouc_data;
94
95         /* The interpreter function called after the async request handled. */
96         osp_update_interpreter_t        ouc_interpreter;
97 };
98
99 static struct object_update_request *object_update_request_alloc(size_t size)
100 {
101         struct object_update_request *ourq;
102
103         OBD_ALLOC_LARGE(ourq, size);
104         if (ourq == NULL)
105                 return ERR_PTR(-ENOMEM);
106
107         ourq->ourq_magic = UPDATE_REQUEST_MAGIC;
108         ourq->ourq_count = 0;
109
110         return ourq;
111 }
112
113 /**
114  * Allocate new update request
115  *
116  * Allocate new update request and insert it to the req_update_list.
117  *
118  * \param [in] our      osp_udate_request where to create a new
119  *                      update request
120  *
121  * \retval      0 if creation succeeds.
122  * \retval      negative errno if creation fails.
123  */
124 int osp_object_update_request_create(struct osp_update_request *our,
125                                      size_t size)
126 {
127         struct osp_update_request_sub *ours;
128
129         OBD_ALLOC_PTR(ours);
130         if (ours == NULL)
131                 return -ENOMEM;
132
133         if (size < OUT_UPDATE_INIT_BUFFER_SIZE)
134                 size = OUT_UPDATE_INIT_BUFFER_SIZE;
135
136         ours->ours_req = object_update_request_alloc(size);
137
138         if (IS_ERR(ours->ours_req)) {
139                 OBD_FREE_PTR(ours);
140                 return -ENOMEM;
141         }
142
143         ours->ours_req_size = size;
144         INIT_LIST_HEAD(&ours->ours_list);
145         list_add_tail(&ours->ours_list, &our->our_req_list);
146
147         return 0;
148 }
149
150 /**
151  * Get current update request
152  *
153  * Get current object update request from our_req_list in
154  * osp_update_request, because we always insert the new update
155  * request in the last position, so the last update request
156  * in the list will be the current update req.
157  *
158  * \param[in] our       osp update request where to get the
159  *                      current object update.
160  *
161  * \retval              the current object update.
162  **/
163 struct osp_update_request_sub *
164 osp_current_object_update_request(struct osp_update_request *our)
165 {
166         if (list_empty(&our->our_req_list))
167                 return NULL;
168
169         return list_entry(our->our_req_list.prev, struct osp_update_request_sub,
170                           ours_list);
171 }
172
173 /**
174  * Allocate and initialize osp_update_request
175  *
176  * osp_update_request is being used to track updates being executed on
177  * this dt_device(OSD or OSP). The update buffer will be 4k initially,
178  * and increased if needed.
179  *
180  * \param [in] dt       dt device
181  *
182  * \retval              osp_update_request being allocated if succeed
183  * \retval              ERR_PTR(errno) if failed
184  */
185 struct osp_update_request *osp_update_request_create(struct dt_device *dt)
186 {
187         struct osp_update_request *our;
188
189         OBD_ALLOC_PTR(our);
190         if (our == NULL)
191                 return ERR_PTR(-ENOMEM);
192
193         INIT_LIST_HEAD(&our->our_req_list);
194         INIT_LIST_HEAD(&our->our_cb_items);
195         INIT_LIST_HEAD(&our->our_list);
196
197         osp_object_update_request_create(our, OUT_UPDATE_INIT_BUFFER_SIZE);
198         return our;
199 }
200
201 void osp_update_request_destroy(struct osp_update_request *our)
202 {
203         struct osp_update_request_sub *ours;
204         struct osp_update_request_sub *tmp;
205
206         if (our == NULL)
207                 return;
208
209         list_for_each_entry_safe(ours, tmp, &our->our_req_list, ours_list) {
210                 list_del(&ours->ours_list);
211                 if (ours->ours_req != NULL)
212                         OBD_FREE(ours->ours_req, ours->ours_req_size);
213                 OBD_FREE_PTR(ours);
214         }
215         OBD_FREE_PTR(our);
216 }
217
218 static void
219 object_update_request_dump(const struct object_update_request *ourq,
220                            unsigned int mask)
221 {
222         unsigned int i;
223         size_t total_size = 0;
224
225         for (i = 0; i < ourq->ourq_count; i++) {
226                 struct object_update    *update;
227                 size_t                  size = 0;
228
229                 update = object_update_request_get(ourq, i, &size);
230                 LASSERT(update != NULL);
231                 CDEBUG(mask, "i = %u fid = "DFID" op = %s master = %u"
232                        "params = %d batchid = "LPU64" size = %zu\n",
233                        i, PFID(&update->ou_fid),
234                        update_op_str(update->ou_type),
235                        update->ou_master_index, update->ou_params_count,
236                        update->ou_batchid, size);
237
238                 total_size += size;
239         }
240
241         CDEBUG(mask, "updates = %p magic = %x count = %d size = %zu\n", ourq,
242                ourq->ourq_magic, ourq->ourq_count, total_size);
243 }
244
245 /**
246  * Prepare inline update request
247  *
248  * Prepare OUT update ptlrpc inline request, and the request usually includes
249  * one update buffer, which does not need bulk transfer.
250  *
251  * \param[in] env       execution environment
252  * \param[in] req       ptlrpc request
253  * \param[in] ours      sub osp_update_request to be packed
254  *
255  * \retval              0 if packing succeeds
256  * \retval              negative errno if packing fails
257  */
258 int osp_prep_inline_update_req(const struct lu_env *env,
259                                struct ptlrpc_request *req,
260                                struct osp_update_request_sub *ours)
261 {
262         struct out_update_header *ouh;
263         __u32 update_req_size = object_update_request_size(ours->ours_req);
264         int rc;
265
266         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_HEADER, RCL_CLIENT,
267                              update_req_size + sizeof(*ouh));
268
269         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
270         if (rc != 0)
271                 RETURN(rc);
272
273         ouh = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_HEADER);
274         ouh->ouh_magic = OUT_UPDATE_HEADER_MAGIC;
275         ouh->ouh_count = 1;
276         ouh->ouh_inline_length = update_req_size;
277
278         memcpy(ouh->ouh_inline_data, ours->ours_req, update_req_size);
279
280         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
281                              RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
282
283         ptlrpc_request_set_replen(req);
284         req->rq_request_portal = OUT_PORTAL;
285         req->rq_reply_portal = OSC_REPLY_PORTAL;
286
287         RETURN(rc);
288 }
289
290 /**
291  * Prepare update request.
292  *
293  * Prepare OUT update ptlrpc request, and the request usually includes
294  * all of updates (stored in \param ureq) from one operation.
295  *
296  * \param[in] env       execution environment
297  * \param[in] imp       import on which ptlrpc request will be sent
298  * \param[in] ureq      hold all of updates which will be packed into the req
299  * \param[in] reqp      request to be created
300  *
301  * \retval              0 if preparation succeeds.
302  * \retval              negative errno if preparation fails.
303  */
304 int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
305                         struct osp_update_request *our,
306                         struct ptlrpc_request **reqp)
307 {
308         struct ptlrpc_request           *req;
309         struct ptlrpc_bulk_desc         *desc;
310         struct osp_update_request_sub   *ours;
311         struct out_update_header        *ouh;
312         struct out_update_buffer        *oub;
313         __u32                           buf_count = 0;
314         int                             rc;
315         ENTRY;
316
317         list_for_each_entry(ours, &our->our_req_list, ours_list) {
318                 object_update_request_dump(ours->ours_req, D_INFO);
319                 buf_count++;
320         }
321         LASSERT(buf_count > 0);
322
323         req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
324         if (req == NULL)
325                 RETURN(-ENOMEM);
326
327         if (buf_count == 1) {
328                 ours = list_entry(our->our_req_list.next,
329                                   struct osp_update_request_sub, ours_list);
330
331                 /* Let's check if it can be packed inline */
332                 if (object_update_request_size(ours->ours_req) +
333                     sizeof(struct out_update_header) <
334                                 OUT_UPDATE_MAX_INLINE_SIZE) {
335                         rc = osp_prep_inline_update_req(env, req, ours);
336                         if (rc == 0)
337                                 *reqp = req;
338                         GOTO(out_req, rc);
339                 }
340         }
341
342         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_HEADER, RCL_CLIENT,
343                              sizeof(struct osp_update_request));
344
345         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_BUF, RCL_CLIENT,
346                              buf_count * sizeof(*oub));
347
348         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
349         if (rc != 0)
350                 GOTO(out_req, rc);
351
352         ouh = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_HEADER);
353         ouh->ouh_magic = OUT_UPDATE_HEADER_MAGIC;
354         ouh->ouh_count = buf_count;
355         ouh->ouh_inline_length = 0;
356         oub = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_BUF);
357         list_for_each_entry(ours, &our->our_req_list, ours_list) {
358                 oub->oub_size = ours->ours_req_size;
359                 oub++;
360         }
361
362         req->rq_bulk_write = 1;
363         desc = ptlrpc_prep_bulk_imp(req, buf_count,
364                 MD_MAX_BRW_SIZE >> LNET_MTU_BITS,
365                 PTLRPC_BULK_GET_SOURCE | PTLRPC_BULK_BUF_KVEC,
366                 MDS_BULK_PORTAL, &ptlrpc_bulk_kvec_ops);
367         if (desc == NULL)
368                 GOTO(out_req, rc = -ENOMEM);
369
370         /* NB req now owns desc and will free it when it gets freed */
371         list_for_each_entry(ours, &our->our_req_list, ours_list)
372                 desc->bd_frag_ops->add_iov_frag(desc, ours->ours_req,
373                                                 ours->ours_req_size);
374
375         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
376                              RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
377
378         ptlrpc_request_set_replen(req);
379         req->rq_request_portal = OUT_PORTAL;
380         req->rq_reply_portal = OSC_REPLY_PORTAL;
381         *reqp = req;
382
383 out_req:
384         if (rc < 0)
385                 ptlrpc_req_finished(req);
386
387         RETURN(rc);
388 }
389
390 /**
391  * Send update RPC.
392  *
393  * Send update request to the remote MDT synchronously.
394  *
395  * \param[in] env       execution environment
396  * \param[in] imp       import on which ptlrpc request will be sent
397  * \param[in] our       hold all of updates which will be packed into the req
398  * \param[in] reqp      request to be created
399  *
400  * \retval              0 if RPC succeeds.
401  * \retval              negative errno if RPC fails.
402  */
403 int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
404                     struct osp_update_request *our,
405                     struct ptlrpc_request **reqp)
406 {
407         struct obd_import       *imp = osp->opd_obd->u.cli.cl_import;
408         struct ptlrpc_request   *req = NULL;
409         int                     rc;
410         ENTRY;
411
412         rc = osp_prep_update_req(env, imp, our, &req);
413         if (rc != 0)
414                 RETURN(rc);
415
416         /* This will only be called with read-only update, and these updates
417          * might be used to retrieve update log during recovery process, so
418          * it will be allowed to send during recovery process */
419         req->rq_allow_replay = 1;
420
421         /* Note: some dt index api might return non-zero result here, like
422          * osd_index_ea_lookup, so we should only check rc < 0 here */
423         rc = ptlrpc_queue_wait(req);
424         our->our_rc = rc;
425         if (rc < 0 || reqp == NULL)
426                 ptlrpc_req_finished(req);
427         else
428                 *reqp = req;
429
430         RETURN(rc);
431 }
432
433 static void osp_trans_stop_cb(struct osp_thandle *oth, int result)
434 {
435         struct dt_txn_commit_cb *dcb;
436         struct dt_txn_commit_cb *tmp;
437
438         /* call per-transaction stop callbacks if any */
439         list_for_each_entry_safe(dcb, tmp, &oth->ot_stop_dcb_list,
440                                  dcb_linkage) {
441                 LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
442                          "commit callback entry: magic=%x name='%s'\n",
443                          dcb->dcb_magic, dcb->dcb_name);
444                 list_del_init(&dcb->dcb_linkage);
445                 dcb->dcb_func(NULL, &oth->ot_super, dcb, result);
446         }
447 }
448
449 /**
450  * Allocate an osp request and initialize it with the given parameters.
451  *
452  * \param[in] obj               pointer to the operation target
453  * \param[in] data              pointer to the data used by the interpreter
454  * \param[in] interpreter       pointer to the interpreter function
455  *
456  * \retval                      pointer to the asychronous request
457  * \retval                      NULL if the allocation failed
458  */
459 static struct osp_update_callback *
460 osp_update_callback_init(struct osp_object *obj, void *data,
461                          osp_update_interpreter_t interpreter)
462 {
463         struct osp_update_callback *ouc;
464
465         OBD_ALLOC_PTR(ouc);
466         if (ouc == NULL)
467                 return NULL;
468
469         lu_object_get(osp2lu_obj(obj));
470         INIT_LIST_HEAD(&ouc->ouc_list);
471         ouc->ouc_obj = obj;
472         ouc->ouc_data = data;
473         ouc->ouc_interpreter = interpreter;
474
475         return ouc;
476 }
477
478 /**
479  * Destroy the osp_update_callback.
480  *
481  * \param[in] env       pointer to the thread context
482  * \param[in] ouc       pointer to osp_update_callback
483  */
484 static void osp_update_callback_fini(const struct lu_env *env,
485                                      struct osp_update_callback *ouc)
486 {
487         LASSERT(list_empty(&ouc->ouc_list));
488
489         lu_object_put(env, osp2lu_obj(ouc->ouc_obj));
490         OBD_FREE_PTR(ouc);
491 }
492
493 /**
494  * Interpret the packaged OUT RPC results.
495  *
496  * For every packaged sub-request, call its registered interpreter function.
497  * Then destroy the sub-request.
498  *
499  * \param[in] env       pointer to the thread context
500  * \param[in] req       pointer to the RPC
501  * \param[in] arg       pointer to data used by the interpreter
502  * \param[in] rc        the RPC return value
503  *
504  * \retval              0 for success
505  * \retval              negative error number on failure
506  */
507 static int osp_update_interpret(const struct lu_env *env,
508                                 struct ptlrpc_request *req, void *arg, int rc)
509 {
510         struct object_update_reply      *reply  = NULL;
511         struct osp_update_args          *oaua   = arg;
512         struct osp_update_request       *our = oaua->oaua_update;
513         struct osp_thandle              *oth;
514         struct osp_update_callback      *ouc;
515         struct osp_update_callback      *next;
516         int                              count  = 0;
517         int                              index  = 0;
518         int                              rc1    = 0;
519
520         ENTRY;
521
522         if (our == NULL)
523                 RETURN(0);
524
525         oaua->oaua_update = NULL;
526         oth = our->our_th;
527         if (oaua->oaua_flow_control) {
528                 struct osp_device *osp;
529
530                 LASSERT(oth != NULL);
531                 osp = dt2osp_dev(oth->ot_super.th_dev);
532                 obd_put_request_slot(&osp->opd_obd->u.cli);
533         }
534
535         /* Unpack the results from the reply message. */
536         if (req->rq_repmsg != NULL) {
537                 reply = req_capsule_server_sized_get(&req->rq_pill,
538                                                      &RMF_OUT_UPDATE_REPLY,
539                                                      OUT_UPDATE_REPLY_SIZE);
540                 if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC)
541                         rc1 = -EPROTO;
542                 else
543                         count = reply->ourp_count;
544         } else {
545                 rc1 = rc;
546         }
547
548         list_for_each_entry_safe(ouc, next, &our->our_cb_items, ouc_list) {
549                 list_del_init(&ouc->ouc_list);
550
551                 /* The peer may only have handled some requests (indicated
552                  * by the 'count') in the packaged OUT RPC, we can only get
553                  * results for the handled part. */
554                 if (index < count && reply->ourp_lens[index] > 0) {
555                         struct object_update_result *result;
556
557                         result = object_update_result_get(reply, index, NULL);
558                         if (result == NULL)
559                                 rc1 = -EPROTO;
560                         else
561                                 rc1 = result->our_rc;
562                 } else {
563                         rc1 = rc;
564                         if (unlikely(rc1 == 0))
565                                 rc1 = -EINVAL;
566                 }
567
568                 if (ouc->ouc_interpreter != NULL)
569                         ouc->ouc_interpreter(env, reply, req, ouc->ouc_obj,
570                                              ouc->ouc_data, index, rc1);
571
572                 osp_update_callback_fini(env, ouc);
573                 index++;
574         }
575
576         if (oaua->oaua_count != NULL && atomic_dec_and_test(oaua->oaua_count))
577                 wake_up_all(oaua->oaua_waitq);
578
579         if (oth != NULL) {
580                 /* oth and osp_update_requests will be destoryed in
581                  * osp_thandle_put */
582                 osp_trans_stop_cb(oth, rc);
583                 osp_thandle_put(oth);
584         } else {
585                 osp_update_request_destroy(our);
586         }
587
588         RETURN(0);
589 }
590
591 /**
592  * Pack all the requests in the shared asynchronous idempotent request queue
593  * into a single OUT RPC that will be given to the background ptlrpcd daemon.
594  *
595  * \param[in] env       pointer to the thread context
596  * \param[in] osp       pointer to the OSP device
597  * \param[in] our       pointer to the shared queue
598  *
599  * \retval              0 for success
600  * \retval              negative error number on failure
601  */
602 int osp_unplug_async_request(const struct lu_env *env,
603                              struct osp_device *osp,
604                              struct osp_update_request *our)
605 {
606         struct osp_update_args  *args;
607         struct ptlrpc_request   *req = NULL;
608         int                      rc;
609
610         rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
611                                  our, &req);
612         if (rc != 0) {
613                 struct osp_update_callback *ouc;
614                 struct osp_update_callback *next;
615
616                 list_for_each_entry_safe(ouc, next,
617                                          &our->our_cb_items, ouc_list) {
618                         list_del_init(&ouc->ouc_list);
619                         if (ouc->ouc_interpreter != NULL)
620                                 ouc->ouc_interpreter(env, NULL, NULL,
621                                                      ouc->ouc_obj,
622                                                      ouc->ouc_data, 0, rc);
623                         osp_update_callback_fini(env, ouc);
624                 }
625                 osp_update_request_destroy(our);
626         } else {
627                 args = ptlrpc_req_async_args(req);
628                 args->oaua_update = our;
629                 args->oaua_count = NULL;
630                 args->oaua_waitq = NULL;
631                 args->oaua_flow_control = false;
632                 req->rq_interpret_reply = osp_update_interpret;
633                 ptlrpcd_add_req(req);
634         }
635
636         return rc;
637 }
638
639 /**
640  * Find or create (if NOT exist or purged) the shared asynchronous idempotent
641  * request queue - osp_device::opd_async_requests.
642  *
643  * If the osp_device::opd_async_requests is not NULL, then return it directly;
644  * otherwise create new osp_update_request and attach it to opd_async_requests.
645  *
646  * \param[in] osp       pointer to the OSP device
647  *
648  * \retval              pointer to the shared queue
649  * \retval              negative error number on failure
650  */
651 static struct osp_update_request *
652 osp_find_or_create_async_update_request(struct osp_device *osp)
653 {
654         struct osp_update_request *our = osp->opd_async_requests;
655
656         if (our != NULL)
657                 return our;
658
659         our = osp_update_request_create(&osp->opd_dt_dev);
660         if (IS_ERR(our))
661                 return our;
662
663         osp->opd_async_requests = our;
664
665         return our;
666 }
667
668 /**
669  * Insert an osp_update_callback into the osp_update_request.
670  *
671  * Insert an osp_update_callback to the osp_update_request. Usually each update
672  * in the osp_update_request will have one correspondent callback, and these
673  * callbacks will be called in rq_interpret_reply.
674  *
675  * \param[in] env               pointer to the thread context
676  * \param[in] obj               pointer to the operation target object
677  * \param[in] data              pointer to the data used by the interpreter
678  * \param[in] interpreter       pointer to the interpreter function
679  *
680  * \retval                      0 for success
681  * \retval                      negative error number on failure
682  */
683 int osp_insert_update_callback(const struct lu_env *env,
684                                struct osp_update_request *our,
685                                struct osp_object *obj, void *data,
686                                osp_update_interpreter_t interpreter)
687 {
688         struct osp_update_callback  *ouc;
689
690         ouc = osp_update_callback_init(obj, data, interpreter);
691         if (ouc == NULL)
692                 RETURN(-ENOMEM);
693
694         list_add_tail(&ouc->ouc_list, &our->our_cb_items);
695
696         return 0;
697 }
698
699 /**
700  * Insert an asynchronous idempotent request to the shared request queue that
701  * is attached to the osp_device.
702  *
703  * This function generates a new osp_async_request with the given parameters,
704  * then tries to insert the request into the osp_device-based shared request
705  * queue. If the queue is full, then triggers the packaged OUT RPC to purge
706  * the shared queue firstly, and then re-tries.
707  *
708  * NOTE: must hold the osp::opd_async_requests_mutex to serialize concurrent
709  *       osp_insert_async_request call from others.
710  *
711  * \param[in] env               pointer to the thread context
712  * \param[in] op                operation type, see 'enum update_type'
713  * \param[in] obj               pointer to the operation target
714  * \param[in] count             array size of the subsequent \a lens and \a bufs
715  * \param[in] lens              buffer length array for the subsequent \a bufs
716  * \param[in] bufs              the buffers to compose the request
717  * \param[in] data              pointer to the data used by the interpreter
718  * \param[in] interpreter       pointer to the interpreter function
719  *
720  * \retval                      0 for success
721  * \retval                      negative error number on failure
722  */
723 int osp_insert_async_request(const struct lu_env *env, enum update_type op,
724                              struct osp_object *obj, int count,
725                              __u16 *lens, const void **bufs, void *data,
726                              osp_update_interpreter_t interpreter)
727 {
728         struct osp_device               *osp;
729         struct osp_update_request       *our;
730         struct object_update            *object_update;
731         size_t                          max_update_size;
732         struct object_update_request    *ureq;
733         struct osp_update_request_sub   *ours;
734         int                             rc = 0;
735         ENTRY;
736
737         osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev);
738         our = osp_find_or_create_async_update_request(osp);
739         if (IS_ERR(our))
740                 RETURN(PTR_ERR(our));
741
742 again:
743         ours = osp_current_object_update_request(our);
744
745         ureq = ours->ours_req;
746         max_update_size = ours->ours_req_size -
747                           object_update_request_size(ureq);
748
749         object_update = update_buffer_get_update(ureq, ureq->ourq_count);
750         rc = out_update_pack(env, object_update, &max_update_size, op,
751                              lu_object_fid(osp2lu_obj(obj)), count, lens, bufs);
752         /* The queue is full. */
753         if (rc == -E2BIG) {
754                 osp->opd_async_requests = NULL;
755                 mutex_unlock(&osp->opd_async_requests_mutex);
756
757                 rc = osp_unplug_async_request(env, osp, our);
758                 mutex_lock(&osp->opd_async_requests_mutex);
759                 if (rc != 0)
760                         RETURN(rc);
761
762                 our = osp_find_or_create_async_update_request(osp);
763                 if (IS_ERR(our))
764                         RETURN(PTR_ERR(our));
765
766                 goto again;
767         } else {
768                 if (rc < 0)
769                         RETURN(rc);
770
771                 ureq->ourq_count++;
772         }
773
774         rc = osp_insert_update_callback(env, our, obj, data, interpreter);
775
776         RETURN(rc);
777 }
778
779 int osp_trans_update_request_create(struct thandle *th)
780 {
781         struct osp_thandle              *oth = thandle_to_osp_thandle(th);
782         struct osp_update_request       *our;
783
784         if (oth->ot_our != NULL)
785                 return 0;
786
787         our = osp_update_request_create(th->th_dev);
788         if (IS_ERR(our)) {
789                 th->th_result = PTR_ERR(our);
790                 return PTR_ERR(our);
791         }
792
793         oth->ot_our = our;
794         our->our_th = oth;
795
796         return 0;
797 }
798
799 void osp_thandle_destroy(struct osp_thandle *oth)
800 {
801         LASSERT(oth->ot_magic == OSP_THANDLE_MAGIC);
802         LASSERT(list_empty(&oth->ot_commit_dcb_list));
803         LASSERT(list_empty(&oth->ot_stop_dcb_list));
804         if (oth->ot_our != NULL)
805                 osp_update_request_destroy(oth->ot_our);
806         OBD_FREE_PTR(oth);
807 }
808
809 /**
810  * The OSP layer dt_device_operations::dt_trans_create() interface
811  * to create a transaction.
812  *
813  * There are two kinds of transactions that will involve OSP:
814  *
815  * 1) If the transaction only contains the updates on remote server
816  *    (MDT or OST), such as re-generating the lost OST-object for
817  *    LFSCK, then it is a remote transaction. For remote transaction,
818  *    the upper layer caller (such as the LFSCK engine) will call the
819  *    dt_trans_create() (with the OSP dt_device as the parameter),
820  *    then the call will be directed to the osp_trans_create() that
821  *    creates the transaction handler and returns it to the caller.
822  *
823  * 2) If the transcation contains both local and remote updates,
824  *    such as cross MDTs create under DNE mode, then the upper layer
825  *    caller will not trigger osp_trans_create(). Instead, it will
826  *    call dt_trans_create() on other dt_device, such as LOD that
827  *    will generate the transaction handler. Such handler will be
828  *    used by the whole transaction in subsequent sub-operations.
829  *
830  * \param[in] env       pointer to the thread context
831  * \param[in] d         pointer to the OSP dt_device
832  *
833  * \retval              pointer to the transaction handler
834  * \retval              negative error number on failure
835  */
836 struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
837 {
838         struct osp_thandle              *oth;
839         struct thandle                  *th = NULL;
840         ENTRY;
841
842         OBD_ALLOC_PTR(oth);
843         if (unlikely(oth == NULL))
844                 RETURN(ERR_PTR(-ENOMEM));
845
846         oth->ot_magic = OSP_THANDLE_MAGIC;
847         th = &oth->ot_super;
848         th->th_dev = d;
849         th->th_tags = LCT_TX_HANDLE;
850
851         atomic_set(&oth->ot_refcount, 1);
852         INIT_LIST_HEAD(&oth->ot_commit_dcb_list);
853         INIT_LIST_HEAD(&oth->ot_stop_dcb_list);
854
855         RETURN(th);
856 }
857
858 /**
859  * Add commit callback to transaction.
860  *
861  * Add commit callback to the osp thandle, which will be called
862  * when the thandle is committed remotely.
863  *
864  * \param[in] th        the thandle
865  * \param[in] dcb       commit callback structure
866  *
867  * \retval              only return 0 for now.
868  */
869 int osp_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
870 {
871         struct osp_thandle *oth = thandle_to_osp_thandle(th);
872
873         LASSERT(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC);
874         LASSERT(&dcb->dcb_func != NULL);
875         if (dcb->dcb_flags & DCB_TRANS_STOP)
876                 list_add(&dcb->dcb_linkage, &oth->ot_stop_dcb_list);
877         else
878                 list_add(&dcb->dcb_linkage, &oth->ot_commit_dcb_list);
879         return 0;
880 }
881
882 static void osp_trans_commit_cb(struct osp_thandle *oth, int result)
883 {
884         struct dt_txn_commit_cb *dcb;
885         struct dt_txn_commit_cb *tmp;
886
887         LASSERT(atomic_read(&oth->ot_refcount) > 0);
888         /* call per-transaction callbacks if any */
889         list_for_each_entry_safe(dcb, tmp, &oth->ot_commit_dcb_list,
890                                  dcb_linkage) {
891                 LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
892                          "commit callback entry: magic=%x name='%s'\n",
893                          dcb->dcb_magic, dcb->dcb_name);
894                 list_del_init(&dcb->dcb_linkage);
895                 dcb->dcb_func(NULL, &oth->ot_super, dcb, result);
896         }
897 }
898
899 static void osp_request_commit_cb(struct ptlrpc_request *req)
900 {
901         struct thandle          *th = req->rq_cb_data;
902         struct osp_thandle      *oth;
903         __u64                   last_committed_transno = 0;
904         int                     result = req->rq_status;
905         ENTRY;
906
907         if (th == NULL)
908                 RETURN_EXIT;
909
910         oth = thandle_to_osp_thandle(th);
911         if (lustre_msg_get_last_committed(req->rq_repmsg))
912                 last_committed_transno =
913                         lustre_msg_get_last_committed(req->rq_repmsg);
914
915         if (last_committed_transno <
916                 req->rq_import->imp_peer_committed_transno)
917                 last_committed_transno =
918                         req->rq_import->imp_peer_committed_transno;
919
920         CDEBUG(D_HA, "trans no "LPU64" committed transno "LPU64"\n",
921                req->rq_transno, last_committed_transno);
922
923         /* If the transaction is not really committed, mark result = 1 */
924         if (req->rq_transno != 0 &&
925             (req->rq_transno > last_committed_transno) && result == 0)
926                 result = 1;
927
928         osp_trans_commit_cb(oth, result);
929         req->rq_committed = 1;
930         osp_thandle_put(oth);
931         EXIT;
932 }
933
934 /**
935  * callback of osp transaction
936  *
937  * Call all of callbacks for this osp thandle. This will only be
938  * called in error handler path. In the normal processing path,
939  * these callback will be called in osp_request_commit_cb() and
940  * osp_update_interpret().
941  *
942  * \param [in] env      execution environment
943  * \param [in] oth      osp thandle
944  * \param [in] rc       result of the osp thandle
945  */
946 void osp_trans_callback(const struct lu_env *env,
947                         struct osp_thandle *oth, int rc)
948 {
949         struct osp_update_callback *ouc;
950         struct osp_update_callback *next;
951
952         if (oth->ot_our != NULL) {
953                 list_for_each_entry_safe(ouc, next,
954                                          &oth->ot_our->our_cb_items, ouc_list) {
955                         list_del_init(&ouc->ouc_list);
956                         if (ouc->ouc_interpreter != NULL)
957                                 ouc->ouc_interpreter(env, NULL, NULL,
958                                                      ouc->ouc_obj,
959                                                      ouc->ouc_data, 0, rc);
960                         osp_update_callback_fini(env, ouc);
961                 }
962         }
963         osp_trans_stop_cb(oth, rc);
964         osp_trans_commit_cb(oth, rc);
965 }
966
967 /**
968  * Send the request for remote updates.
969  *
970  * Send updates to the remote MDT. Prepare the request by osp_update_req
971  * and send them to remote MDT, for sync request, it will wait
972  * until the reply return, otherwise hand it to ptlrpcd.
973  *
974  * Please refer to osp_trans_create() for transaction type.
975  *
976  * \param[in] env               pointer to the thread context
977  * \param[in] osp               pointer to the OSP device
978  * \param[in] our               pointer to the osp_update_request
979  *
980  * \retval                      0 for success
981  * \retval                      negative error number on failure
982  */
983 static int osp_send_update_req(const struct lu_env *env,
984                                struct osp_device *osp,
985                                struct osp_update_request *our)
986 {
987         struct osp_update_args  *args;
988         struct ptlrpc_request   *req;
989         struct lu_device *top_device;
990         struct osp_thandle      *oth = our->our_th;
991         int     rc = 0;
992         ENTRY;
993
994         LASSERT(oth != NULL);
995         LASSERT(our->our_req_sent == 0);
996         rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
997                                  our, &req);
998         if (rc != 0) {
999                 osp_trans_callback(env, oth, rc);
1000                 RETURN(rc);
1001         }
1002
1003         args = ptlrpc_req_async_args(req);
1004         args->oaua_update = our;
1005         osp_thandle_get(oth); /* hold for update interpret */
1006         req->rq_interpret_reply = osp_update_interpret;
1007         if (!oth->ot_super.th_wait_submit && !oth->ot_super.th_sync) {
1008                 if (!osp->opd_imp_active || !osp->opd_imp_connected) {
1009                         osp_trans_callback(env, oth, rc);
1010                         osp_thandle_put(oth);
1011                         GOTO(out, rc = -ENOTCONN);
1012                 }
1013
1014                 rc = obd_get_request_slot(&osp->opd_obd->u.cli);
1015                 if (rc != 0) {
1016                         osp_trans_callback(env, oth, rc);
1017                         osp_thandle_put(oth);
1018                         GOTO(out, rc = -ENOTCONN);
1019                 }
1020                 args->oaua_flow_control = true;
1021
1022                 if (!osp->opd_connect_mdt) {
1023                         down_read(&osp->opd_async_updates_rwsem);
1024                         args->oaua_count = &osp->opd_async_updates_count;
1025                         args->oaua_waitq = &osp->opd_syn_barrier_waitq;
1026                         up_read(&osp->opd_async_updates_rwsem);
1027                         atomic_inc(args->oaua_count);
1028                 }
1029
1030                 ptlrpcd_add_req(req);
1031                 req = NULL;
1032         } else {
1033                 osp_thandle_get(oth); /* hold for commit callback */
1034                 req->rq_commit_cb = osp_request_commit_cb;
1035                 req->rq_cb_data = &oth->ot_super;
1036                 args->oaua_flow_control = false;
1037
1038                 /* If the transaction is created during MDT recoverying
1039                  * process, it means this is an recovery update, we need
1040                  * to let OSP send it anyway without checking recoverying
1041                  * status, in case the other target is being recoveried
1042                  * at the same time, and if we wait here for the import
1043                  * to be recoveryed, it might cause deadlock */
1044                 top_device = osp->opd_dt_dev.dd_lu_dev.ld_site->ls_top_dev;
1045                 if (top_device->ld_obd->obd_recovering)
1046                         req->rq_allow_replay = 1;
1047
1048                 if (osp->opd_connect_mdt)
1049                         osp_get_rpc_lock(osp);
1050                 rc = ptlrpc_queue_wait(req);
1051                 if (osp->opd_connect_mdt)
1052                         osp_put_rpc_lock(osp);
1053                 if ((rc == -ENOMEM && req->rq_set == NULL) ||
1054                     (req->rq_transno == 0 && !req->rq_committed)) {
1055                         if (args->oaua_update != NULL) {
1056                                 /* If osp_update_interpret is not being called,
1057                                  * release the osp_thandle */
1058                                 args->oaua_update = NULL;
1059                                 osp_thandle_put(oth);
1060                         }
1061
1062                         req->rq_cb_data = NULL;
1063                         rc = rc == 0 ? req->rq_status : rc;
1064                         osp_trans_callback(env, oth, rc);
1065                         osp_thandle_put(oth);
1066                         GOTO(out, rc);
1067                 }
1068         }
1069 out:
1070         if (req != NULL)
1071                 ptlrpc_req_finished(req);
1072
1073         RETURN(rc);
1074 }
1075
1076 /**
1077  * Get local thandle for osp_thandle
1078  *
1079  * Get the local OSD thandle from the OSP thandle. Currently, there
1080  * are a few OSP API (osp_object_create() and osp_sync_add()) needs
1081  * to update the object on local OSD device.
1082  *
1083  * If the osp_thandle comes from normal stack (MDD->LOD->OSP), then
1084  * we will get local thandle by thandle_get_sub_by_dt.
1085  *
1086  * If the osp_thandle is remote thandle (th_top == NULL, only used
1087  * by LFSCK), then it will create a local thandle, and stop it in
1088  * osp_trans_stop(). And this only happens on OSP for OST.
1089  *
1090  * These are temporary solution, once OSP accessing OSD object is
1091  * being fixed properly, this function should be removed. XXX
1092  *
1093  * \param[in] env               pointer to the thread context
1094  * \param[in] th                pointer to the transaction handler
1095  * \param[in] dt                pointer to the OSP device
1096  *
1097  * \retval                      pointer to the local thandle
1098  * \retval                      ERR_PTR(errno) if it fails.
1099  **/
1100 struct thandle *osp_get_storage_thandle(const struct lu_env *env,
1101                                         struct thandle *th,
1102                                         struct osp_device *osp)
1103 {
1104         struct osp_thandle      *oth;
1105         struct thandle          *local_th;
1106
1107         if (th->th_top != NULL)
1108                 return thandle_get_sub_by_dt(env, th->th_top,
1109                                              osp->opd_storage);
1110
1111         LASSERT(!osp->opd_connect_mdt);
1112         oth = thandle_to_osp_thandle(th);
1113         if (oth->ot_storage_th != NULL)
1114                 return oth->ot_storage_th;
1115
1116         local_th = dt_trans_create(env, osp->opd_storage);
1117         if (IS_ERR(local_th))
1118                 return local_th;
1119
1120         oth->ot_storage_th = local_th;
1121
1122         return local_th;
1123 }
1124
1125 /**
1126  * Set version for the transaction
1127  *
1128  * Set the version for the transaction, then the osp RPC will be
1129  * sent in the order of version, i.e. the transaction with lower
1130  * version will be sent first.
1131  *
1132  * \param [in] oth      osp thandle to be set version.
1133  *
1134  * \retval              0 if set version succeeds
1135  *                      negative errno if set version fails.
1136  */
1137 int osp_check_and_set_rpc_version(struct osp_thandle *oth)
1138 {
1139         struct osp_device *osp = dt2osp_dev(oth->ot_super.th_dev);
1140         struct osp_updates *ou = osp->opd_update;
1141
1142         if (ou == NULL)
1143                 return -EIO;
1144
1145         if (oth->ot_version != 0)
1146                 return 0;
1147
1148         spin_lock(&ou->ou_lock);
1149         oth->ot_version = ou->ou_version++;
1150         spin_unlock(&ou->ou_lock);
1151
1152         CDEBUG(D_INFO, "%s: version "LPU64" oth:version %p:"LPU64"\n",
1153                osp->opd_obd->obd_name, ou->ou_version, oth, oth->ot_version);
1154
1155         return 0;
1156 }
1157
1158 /**
1159  * Get next OSP update request in the sending list
1160  * Get next OSP update request in the sending list by version number, next
1161  * request will be
1162  * 1. transaction which does not have a version number.
1163  * 2. transaction whose version == opd_rpc_version.
1164  *
1165  * \param [in] ou       osp update structure.
1166  * \param [out] ourp    the pointer holding the next update request.
1167  *
1168  * \retval              true if getting the next transaction.
1169  * \retval              false if not getting the next transaction.
1170  */
1171 static bool
1172 osp_get_next_request(struct osp_updates *ou, struct osp_update_request **ourp)
1173 {
1174         struct osp_update_request *our;
1175         struct osp_update_request *tmp;
1176         bool                    got_req = false;
1177
1178         spin_lock(&ou->ou_lock);
1179         list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) {
1180                 LASSERT(our->our_th != NULL);
1181                 CDEBUG(D_INFO, "our %p version "LPU64" rpc_version "LPU64"\n",
1182                        our, our->our_th->ot_version, ou->ou_rpc_version);
1183                 if (our->our_th->ot_version == 0) {
1184                         list_del_init(&our->our_list);
1185                         *ourp = our;
1186                         got_req = true;
1187                         break;
1188                 }
1189
1190                 /* Find next osp_update_request in the list */
1191                 if (our->our_th->ot_version == ou->ou_rpc_version) {
1192                         list_del_init(&our->our_list);
1193                         *ourp = our;
1194                         got_req = true;
1195                         break;
1196                 }
1197         }
1198         spin_unlock(&ou->ou_lock);
1199
1200         return got_req;
1201 }
1202
1203 static void osp_update_rpc_version(struct osp_updates *ou,
1204                                    struct osp_thandle *oth)
1205 {
1206         if (oth->ot_version == 0)
1207                 return;
1208
1209         LASSERT(oth->ot_version == ou->ou_rpc_version);
1210         spin_lock(&ou->ou_lock);
1211         ou->ou_rpc_version++;
1212         spin_unlock(&ou->ou_lock);
1213 }
1214
1215 /**
1216  * Sending update thread
1217  *
1218  * Create thread to send update request to other MDTs, this thread will pull
1219  * out update request from the list in OSP by version number, i.e. it will
1220  * make sure the update request with lower version number will be sent first.
1221  *
1222  * \param[in] arg       hold the OSP device.
1223  *
1224  * \retval              0 if the thread is created successfully.
1225  * \retal               negative error if the thread is not created
1226  *                      successfully.
1227  */
1228 int osp_send_update_thread(void *arg)
1229 {
1230         struct lu_env           env;
1231         struct osp_device       *osp = arg;
1232         struct l_wait_info       lwi = { 0 };
1233         struct osp_updates      *ou = osp->opd_update;
1234         struct ptlrpc_thread    *thread = &osp->opd_update_thread;
1235         struct osp_update_request *our = NULL;
1236         int                     rc;
1237         ENTRY;
1238
1239         LASSERT(ou != NULL);
1240         rc = lu_env_init(&env, osp->opd_dt_dev.dd_lu_dev.ld_type->ldt_ctx_tags);
1241         if (rc < 0) {
1242                 CERROR("%s: init env error: rc = %d\n", osp->opd_obd->obd_name,
1243                        rc);
1244                 RETURN(rc);
1245         }
1246
1247         thread->t_flags = SVC_RUNNING;
1248         wake_up(&thread->t_ctl_waitq);
1249         while (1) {
1250                 our = NULL;
1251                 l_wait_event(ou->ou_waitq,
1252                              !osp_send_update_thread_running(osp) ||
1253                              osp_get_next_request(ou, &our),
1254                              &lwi);
1255
1256                 if (!osp_send_update_thread_running(osp)) {
1257                         if (our != NULL && our->our_th != NULL) {
1258                                 osp_trans_callback(&env, our->our_th, -EINTR);
1259                                 osp_thandle_put(our->our_th);
1260                         }
1261                         break;
1262                 }
1263
1264                 if (our->our_req_sent == 0) {
1265                         if (our->our_th != NULL &&
1266                             our->our_th->ot_super.th_result != 0)
1267                                 osp_trans_callback(&env, our->our_th,
1268                                         our->our_th->ot_super.th_result);
1269                         else
1270                                 rc = osp_send_update_req(&env, osp, our);
1271                 }
1272
1273                 if (our->our_th != NULL) {
1274                         /* Update the rpc version */
1275                         osp_update_rpc_version(ou, our->our_th);
1276                         /* Balanced for thandle_get in osp_trans_trigger() */
1277                         osp_thandle_put(our->our_th);
1278                 }
1279         }
1280
1281         thread->t_flags = SVC_STOPPED;
1282         lu_env_fini(&env);
1283         wake_up(&thread->t_ctl_waitq);
1284
1285         RETURN(0);
1286 }
1287
1288 /**
1289  * Trigger the request for remote updates.
1290  *
1291  * Add the request to the sending list, and wake up osp update
1292  * sending thread.
1293  *
1294  * \param[in] env               pointer to the thread context
1295  * \param[in] osp               pointer to the OSP device
1296  * \param[in] oth               pointer to the transaction handler
1297  *
1298  */
1299 static void osp_trans_trigger(const struct lu_env *env,
1300                              struct osp_device *osp,
1301                              struct osp_thandle *oth)
1302 {
1303
1304         CDEBUG(D_INFO, "%s: add oth %p with version "LPU64"\n",
1305                osp->opd_obd->obd_name, oth, oth->ot_version);
1306
1307         LASSERT(oth->ot_magic == OSP_THANDLE_MAGIC);
1308         osp_thandle_get(oth);
1309         LASSERT(oth->ot_our != NULL);
1310         spin_lock(&osp->opd_update->ou_lock);
1311         list_add_tail(&oth->ot_our->our_list,
1312                       &osp->opd_update->ou_list);
1313         spin_unlock(&osp->opd_update->ou_lock);
1314
1315         wake_up(&osp->opd_update->ou_waitq);
1316 }
1317
1318 /**
1319  * The OSP layer dt_device_operations::dt_trans_start() interface
1320  * to start the transaction.
1321  *
1322  * If the transaction is a remote transaction, then related remote
1323  * updates will be triggered in the osp_trans_stop().
1324  * Please refer to osp_trans_create() for transaction type.
1325  *
1326  * \param[in] env               pointer to the thread context
1327  * \param[in] dt                pointer to the OSP dt_device
1328  * \param[in] th                pointer to the transaction handler
1329  *
1330  * \retval                      0 for success
1331  * \retval                      negative error number on failure
1332  */
1333 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
1334                     struct thandle *th)
1335 {
1336         struct osp_thandle      *oth = thandle_to_osp_thandle(th);
1337
1338         if (oth->ot_super.th_sync)
1339                 oth->ot_our->our_flags |= UPDATE_FL_SYNC;
1340         /* For remote thandle, if there are local thandle, start it here*/
1341         if (is_only_remote_trans(th) && oth->ot_storage_th != NULL)
1342                 return dt_trans_start(env, oth->ot_storage_th->th_dev,
1343                                       oth->ot_storage_th);
1344         return 0;
1345 }
1346
1347 /**
1348  * The OSP layer dt_device_operations::dt_trans_stop() interface
1349  * to stop the transaction.
1350  *
1351  * If the transaction is a remote transaction, related remote
1352  * updates will be triggered here via osp_trans_trigger().
1353  *
1354  * For synchronous mode update or any failed update, the request
1355  * will be destroyed explicitly when the osp_trans_stop().
1356  *
1357  * Please refer to osp_trans_create() for transaction type.
1358  *
1359  * \param[in] env               pointer to the thread context
1360  * \param[in] dt                pointer to the OSP dt_device
1361  * \param[in] th                pointer to the transaction handler
1362  *
1363  * \retval                      0 for success
1364  * \retval                      negative error number on failure
1365  */
1366 int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
1367                    struct thandle *th)
1368 {
1369         struct osp_thandle       *oth = thandle_to_osp_thandle(th);
1370         struct osp_update_request *our = oth->ot_our;
1371         struct osp_device        *osp = dt2osp_dev(dt);
1372         int                      rc = 0;
1373         ENTRY;
1374
1375         /* For remote transaction, if there is local storage thandle,
1376          * stop it first */
1377         if (oth->ot_storage_th != NULL && th->th_top == NULL) {
1378                 dt_trans_stop(env, oth->ot_storage_th->th_dev,
1379                               oth->ot_storage_th);
1380                 oth->ot_storage_th = NULL;
1381         }
1382
1383         if (our == NULL || list_empty(&our->our_req_list)) {
1384                 osp_trans_callback(env, oth, th->th_result);
1385                 GOTO(out, rc = th->th_result);
1386         }
1387
1388         if (!osp->opd_connect_mdt) {
1389                 osp_trans_callback(env, oth, th->th_result);
1390                 rc = osp_send_update_req(env, osp, oth->ot_our);
1391                 GOTO(out, rc);
1392         }
1393
1394         if (osp->opd_update == NULL ||
1395             !osp_send_update_thread_running(osp)) {
1396                 osp_trans_callback(env, oth, -EIO);
1397                 GOTO(out, rc = -EIO);
1398         }
1399
1400         if (th->th_sync) {
1401                 /* if th_sync is set, then it needs to be sent
1402                  * right away. Note: even thought the RPC has been
1403                  * sent, it still needs to be added to the sending
1404                  * list (see osp_trans_trigger()), so ou_rpc_version
1405                  * can be updated correctly. */
1406                 rc = osp_send_update_req(env, osp, our);
1407                 our->our_req_sent = 1;
1408         }
1409
1410         osp_trans_trigger(env, osp, oth);
1411 out:
1412         osp_thandle_put(oth);
1413
1414         RETURN(rc);
1415 }