Whamcloud - gitweb
3daed7334e99b9228ac211a74cb5787b12bc482a
[fs/lustre-release.git] / lustre / osp / osp_trans.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_trans.c
27  *
28  *
29  * 1. OSP (Object Storage Proxy) transaction methods
30  *
31  * Implement OSP layer transaction related interfaces for the dt_device API
32  * dt_device_operations.
33  *
34  *
35  * 2. Handle asynchronous idempotent operations
36  *
37  * The OSP uses OUT (Object Unified Target) RPC to talk with other server
38  * (MDT or OST) for kinds of operations, such as create, unlink, insert,
39  * delete, lookup, set_(x)attr, get_(x)attr, and etc. To reduce the number
40  * of RPCs, we allow multiple operations to be packaged together in single
41  * OUT RPC.
42  *
43  * For the asynchronous idempotent operations, such as get_(x)attr, related
44  * RPCs will be inserted into a osp_device based shared asynchronous request
45  * queue - osp_device::opd_async_requests. When the queue is full, all the
46  * requests in the queue will be packaged into a single OUT RPC and given to
47  * the ptlrpcd daemon (for sending), then the queue is purged and other new
48  * requests can be inserted into it.
49  *
50  * When the asynchronous idempotent operation inserts the request into the
51  * shared queue, it will register an interpreter. When the packaged OUT RPC
52  * is replied (or failed to be sent out), all the registered interpreters
53  * will be called one by one to handle each own result.
54  *
55  *
56  * Author: Di Wang <di.wang@intel.com>
57  * Author: Fan, Yong <fan.yong@intel.com>
58  */
59
60 #define DEBUG_SUBSYSTEM S_MDS
61
62 #include "osp_internal.h"
63
64 struct osp_async_update_args {
65         struct dt_update_request *oaua_update;
66         atomic_t                 *oaua_count;
67         wait_queue_head_t        *oaua_waitq;
68         bool                      oaua_flow_control;
69 };
70
71 struct osp_async_request {
72         /* list in the dt_update_request::dur_cb_items */
73         struct list_head                 oar_list;
74
75         /* The target of the async update request. */
76         struct osp_object               *oar_obj;
77
78         /* The data used by oar_interpreter. */
79         void                            *oar_data;
80
81         /* The interpreter function called after the async request handled. */
82         osp_async_request_interpreter_t  oar_interpreter;
83 };
84
85 /**
86  * Allocate an asynchronous request and initialize it with the given parameters.
87  *
88  * \param[in] obj               pointer to the operation target
89  * \param[in] data              pointer to the data used by the interpreter
90  * \param[in] interpreter       pointer to the interpreter function
91  *
92  * \retval                      pointer to the asychronous request
93  * \retval                      NULL if the allocation failed
94  */
95 static struct osp_async_request *
96 osp_async_request_init(struct osp_object *obj, void *data,
97                        osp_async_request_interpreter_t interpreter)
98 {
99         struct osp_async_request *oar;
100
101         OBD_ALLOC_PTR(oar);
102         if (oar == NULL)
103                 return NULL;
104
105         lu_object_get(osp2lu_obj(obj));
106         INIT_LIST_HEAD(&oar->oar_list);
107         oar->oar_obj = obj;
108         oar->oar_data = data;
109         oar->oar_interpreter = interpreter;
110
111         return oar;
112 }
113
114 /**
115  * Destroy the asychronous request.
116  *
117  * \param[in] env       pointer to the thread context
118  * \param[in] oar       pointer to asychronous request
119  */
120 static void osp_async_request_fini(const struct lu_env *env,
121                                    struct osp_async_request *oar)
122 {
123         LASSERT(list_empty(&oar->oar_list));
124
125         lu_object_put(env, osp2lu_obj(oar->oar_obj));
126         OBD_FREE_PTR(oar);
127 }
128
129 /**
130  * Interpret the packaged OUT RPC results.
131  *
132  * For every packaged sub-request, call its registered interpreter function.
133  * Then destroy the sub-request.
134  *
135  * \param[in] env       pointer to the thread context
136  * \param[in] req       pointer to the RPC
137  * \param[in] arg       pointer to data used by the interpreter
138  * \param[in] rc        the RPC return value
139  *
140  * \retval              0 for success
141  * \retval              negative error number on failure
142  */
143 static int osp_async_update_interpret(const struct lu_env *env,
144                                       struct ptlrpc_request *req,
145                                       void *arg, int rc)
146 {
147         struct object_update_reply      *reply  = NULL;
148         struct osp_async_update_args    *oaua   = arg;
149         struct dt_update_request        *dt_update = oaua->oaua_update;
150         struct osp_async_request        *oar;
151         struct osp_async_request        *next;
152         int                              count  = 0;
153         int                              index  = 0;
154         int                              rc1    = 0;
155
156         if (oaua->oaua_flow_control)
157                 obd_put_request_slot(
158                                 &dt2osp_dev(dt_update->dur_dt)->opd_obd->u.cli);
159
160         /* Unpack the results from the reply message. */
161         if (req->rq_repmsg != NULL) {
162                 reply = req_capsule_server_sized_get(&req->rq_pill,
163                                                      &RMF_OUT_UPDATE_REPLY,
164                                                      OUT_UPDATE_REPLY_SIZE);
165                 if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC)
166                         rc1 = -EPROTO;
167                 else
168                         count = reply->ourp_count;
169         } else {
170                 rc1 = rc;
171         }
172
173         list_for_each_entry_safe(oar, next, &dt_update->dur_cb_items,
174                                  oar_list) {
175                 list_del_init(&oar->oar_list);
176
177                 /* The peer may only have handled some requests (indicated
178                  * by the 'count') in the packaged OUT RPC, we can only get
179                  * results for the handled part. */
180                 if (index < count && reply->ourp_lens[index] > 0) {
181                         struct object_update_result *result;
182
183                         result = object_update_result_get(reply, index, NULL);
184                         if (result == NULL)
185                                 rc1 = -EPROTO;
186                         else
187                                 rc1 = result->our_rc;
188                 } else {
189                         rc1 = rc;
190                         if (unlikely(rc1 == 0))
191                                 rc1 = -EINVAL;
192                 }
193
194                 oar->oar_interpreter(env, reply, req, oar->oar_obj,
195                                        oar->oar_data, index, rc1);
196                 osp_async_request_fini(env, oar);
197                 index++;
198         }
199
200         if (oaua->oaua_count != NULL && atomic_dec_and_test(oaua->oaua_count))
201                 wake_up_all(oaua->oaua_waitq);
202
203         dt_update_request_destroy(dt_update);
204
205         return 0;
206 }
207
208 /**
209  * Pack all the requests in the shared asynchronous idempotent request queue
210  * into a single OUT RPC that will be given to the background ptlrpcd daemon.
211  *
212  * \param[in] env       pointer to the thread context
213  * \param[in] osp       pointer to the OSP device
214  * \param[in] update    pointer to the shared queue
215  *
216  * \retval              0 for success
217  * \retval              negative error number on failure
218  */
219 int osp_unplug_async_request(const struct lu_env *env,
220                              struct osp_device *osp,
221                              struct dt_update_request *update)
222 {
223         struct osp_async_update_args    *args;
224         struct ptlrpc_request           *req = NULL;
225         int                              rc;
226
227         rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
228                                  update->dur_buf.ub_req, &req);
229         if (rc != 0) {
230                 struct osp_async_request *oar;
231                 struct osp_async_request *next;
232
233                 list_for_each_entry_safe(oar, next,
234                                          &update->dur_cb_items, oar_list) {
235                         list_del_init(&oar->oar_list);
236                         oar->oar_interpreter(env, NULL, NULL, oar->oar_obj,
237                                                oar->oar_data, 0, rc);
238                         osp_async_request_fini(env, oar);
239                 }
240                 dt_update_request_destroy(update);
241         } else {
242                 args = ptlrpc_req_async_args(req);
243                 args->oaua_update = update;
244                 args->oaua_count = NULL;
245                 args->oaua_waitq = NULL;
246                 args->oaua_flow_control = false;
247                 req->rq_interpret_reply = osp_async_update_interpret;
248                 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
249         }
250
251         return rc;
252 }
253
254 /**
255  * Find or create (if NOT exist or purged) the shared asynchronous idempotent
256  * request queue - osp_device::opd_async_requests.
257  *
258  * If the osp_device::opd_async_requests is not NULL, then return it directly;
259  * otherwise create new dt_update_request and attach it to opd_async_requests.
260  *
261  * \param[in] osp       pointer to the OSP device
262  *
263  * \retval              pointer to the shared queue
264  * \retval              negative error number on failure
265  */
266 static struct dt_update_request *
267 osp_find_or_create_async_update_request(struct osp_device *osp)
268 {
269         struct dt_update_request *update = osp->opd_async_requests;
270
271         if (update != NULL)
272                 return update;
273
274         update = dt_update_request_create(&osp->opd_dt_dev);
275         if (!IS_ERR(update))
276                 osp->opd_async_requests = update;
277
278         return update;
279 }
280
281 /**
282  * Insert an asynchronous idempotent request to the shared request queue that
283  * is attached to the osp_device.
284  *
285  * This function generates a new osp_async_request with the given parameters,
286  * then tries to insert the request into the osp_device-based shared request
287  * queue. If the queue is full, then triggers the packaged OUT RPC to purge
288  * the shared queue firstly, and then re-tries.
289  *
290  * NOTE: must hold the osp::opd_async_requests_mutex to serialize concurrent
291  *       osp_insert_async_request call from others.
292  *
293  * \param[in] env               pointer to the thread context
294  * \param[in] op                operation type, see 'enum update_type'
295  * \param[in] obj               pointer to the operation target
296  * \param[in] count             array size of the subsequent \a lens and \a bufs
297  * \param[in] lens              buffer length array for the subsequent \a bufs
298  * \param[in] bufs              the buffers to compose the request
299  * \param[in] data              pointer to the data used by the interpreter
300  * \param[in] interpreter       pointer to the interpreter function
301  *
302  * \retval                      0 for success
303  * \retval                      negative error number on failure
304  */
305 int osp_insert_async_request(const struct lu_env *env, enum update_type op,
306                              struct osp_object *obj, int count,
307                              __u16 *lens, const void **bufs, void *data,
308                              osp_async_request_interpreter_t interpreter)
309 {
310         struct osp_async_request     *oar;
311         struct osp_device            *osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev);
312         struct dt_update_request     *update;
313         int                           rc  = 0;
314         ENTRY;
315
316         oar = osp_async_request_init(obj, data, interpreter);
317         if (oar == NULL)
318                 RETURN(-ENOMEM);
319
320         update = osp_find_or_create_async_update_request(osp);
321         if (IS_ERR(update))
322                 GOTO(out, rc = PTR_ERR(update));
323
324 again:
325         /* The queue is full. */
326         rc = out_update_pack(env, &update->dur_buf, op,
327                              lu_object_fid(osp2lu_obj(obj)), count, lens, bufs,
328                              0);
329         if (rc == -E2BIG) {
330                 osp->opd_async_requests = NULL;
331                 mutex_unlock(&osp->opd_async_requests_mutex);
332
333                 rc = osp_unplug_async_request(env, osp, update);
334                 mutex_lock(&osp->opd_async_requests_mutex);
335                 if (rc != 0)
336                         GOTO(out, rc);
337
338                 update = osp_find_or_create_async_update_request(osp);
339                 if (IS_ERR(update))
340                         GOTO(out, rc = PTR_ERR(update));
341
342                 goto again;
343         }
344
345         if (rc == 0)
346                 list_add_tail(&oar->oar_list, &update->dur_cb_items);
347
348         GOTO(out, rc);
349
350 out:
351         if (rc != 0)
352                 osp_async_request_fini(env, oar);
353
354         return rc;
355 }
356
357 /**
358  * The OSP layer dt_device_operations::dt_trans_create() interface
359  * to create a transaction.
360  *
361  * There are two kinds of transactions that will involve OSP:
362  *
363  * 1) If the transaction only contains the updates on remote server
364  *    (MDT or OST), such as re-generating the lost OST-object for
365  *    LFSCK, then it is a remote transaction. For remote transaction,
366  *    the upper layer caller (such as the LFSCK engine) will call the
367  *    dt_trans_create() (with the OSP dt_device as the parameter),
368  *    then the call will be directed to the osp_trans_create() that
369  *    creates the transaction handler and returns it to the caller.
370  *
371  * 2) If the transcation contains both local and remote updates,
372  *    such as cross MDTs create under DNE mode, then the upper layer
373  *    caller will not trigger osp_trans_create(). Instead, it will
374  *    call dt_trans_create() on other dt_device, such as LOD that
375  *    will generate the transaction handler. Such handler will be
376  *    used by the whole transaction in subsequent sub-operations.
377  *
378  * \param[in] env       pointer to the thread context
379  * \param[in] d         pointer to the OSP dt_device
380  *
381  * \retval              pointer to the transaction handler
382  * \retval              negative error number on failure
383  */
384 struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
385 {
386         struct osp_thandle              *oth;
387         struct thandle                  *th = NULL;
388         struct dt_update_request        *update;
389         ENTRY;
390
391         OBD_ALLOC_PTR(oth);
392         if (unlikely(oth == NULL))
393                 RETURN(ERR_PTR(-ENOMEM));
394
395         th = &oth->ot_super;
396         th->th_dev = d;
397         th->th_tags = LCT_TX_HANDLE;
398
399         update = dt_update_request_create(d);
400         if (IS_ERR(update)) {
401                 OBD_FREE_PTR(oth);
402                 RETURN(ERR_CAST(update));
403         }
404
405         oth->ot_dur = update;
406         oth->ot_send_updates_after_local_trans = false;
407
408         RETURN(th);
409 }
410
411 /**
412  * Prepare update request.
413  *
414  * Prepare OUT update ptlrpc request, and the request usually includes
415  * all of updates (stored in \param ureq) from one operation.
416  *
417  * \param[in] env       execution environment
418  * \param[in] imp       import on which ptlrpc request will be sent
419  * \param[in] ureq      hold all of updates which will be packed into the req
420  * \param[in] reqp      request to be created
421  *
422  * \retval              0 if preparation succeeds.
423  * \retval              negative errno if preparation fails.
424  */
425 int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
426                         const struct object_update_request *ureq,
427                         struct ptlrpc_request **reqp)
428 {
429         struct ptlrpc_request           *req;
430         struct object_update_request    *tmp;
431         int                             ureq_len;
432         int                             rc;
433         ENTRY;
434
435         req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
436         if (req == NULL)
437                 RETURN(-ENOMEM);
438
439         ureq_len = object_update_request_size(ureq);
440         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE, RCL_CLIENT,
441                              ureq_len);
442
443         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
444         if (rc != 0) {
445                 ptlrpc_req_finished(req);
446                 RETURN(rc);
447         }
448
449         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
450                              RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
451
452         tmp = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE);
453         memcpy(tmp, ureq, ureq_len);
454
455         ptlrpc_request_set_replen(req);
456         req->rq_request_portal = OUT_PORTAL;
457         req->rq_reply_portal = OSC_REPLY_PORTAL;
458         *reqp = req;
459
460         RETURN(rc);
461 }
462
463 /**
464  * Send update RPC.
465  *
466  * Send update request to the remote MDT synchronously.
467  *
468  * \param[in] env       execution environment
469  * \param[in] imp       import on which ptlrpc request will be sent
470  * \param[in] dt_update hold all of updates which will be packed into the req
471  * \param[in] reqp      request to be created
472  *
473  * \retval              0 if RPC succeeds.
474  * \retval              negative errno if RPC fails.
475  */
476 int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
477                     struct dt_update_request *dt_update,
478                     struct ptlrpc_request **reqp, bool rpc_lock)
479 {
480         struct obd_import       *imp = osp->opd_obd->u.cli.cl_import;
481         struct ptlrpc_request   *req = NULL;
482         int                     rc;
483         ENTRY;
484
485         rc = osp_prep_update_req(env, imp, dt_update->dur_buf.ub_req, &req);
486         if (rc != 0)
487                 RETURN(rc);
488
489         /* Note: some dt index api might return non-zero result here, like
490          * osd_index_ea_lookup, so we should only check rc < 0 here */
491         if (rpc_lock)
492                 osp_get_rpc_lock(osp);
493         rc = ptlrpc_queue_wait(req);
494         if (rpc_lock)
495                 osp_put_rpc_lock(osp);
496         if (rc < 0) {
497                 ptlrpc_req_finished(req);
498                 dt_update->dur_rc = rc;
499                 RETURN(rc);
500         }
501
502         if (reqp != NULL) {
503                 *reqp = req;
504                 RETURN(rc);
505         }
506
507         dt_update->dur_rc = rc;
508
509         ptlrpc_req_finished(req);
510
511         RETURN(rc);
512 }
513
514 /**
515  * Trigger the request for remote updates.
516  *
517  * If the transaction is not a remote one or it is required to be sync mode
518  * (th->th_sync is set), then it will be sent synchronously; otherwise, the
519  * RPC will be sent asynchronously.
520  *
521  * Please refer to osp_trans_create() for transaction type.
522  *
523  * \param[in] env               pointer to the thread context
524  * \param[in] osp               pointer to the OSP device
525  * \param[in] dt_update         pointer to the dt_update_request
526  * \param[in] th                pointer to the transaction handler
527  * \param[in] flow_control      whether need to control the flow
528  *
529  * \retval                      0 for success
530  * \retval                      negative error number on failure
531  */
532 static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
533                              struct dt_update_request *dt_update,
534                              struct thandle *th, bool flow_control)
535 {
536         int     rc = 0;
537
538         if (is_only_remote_trans(th) && !th->th_sync) {
539                 struct osp_async_update_args    *args;
540                 struct ptlrpc_request           *req;
541
542                 rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
543                                          dt_update->dur_buf.ub_req, &req);
544                 if (rc != 0)
545                         return rc;
546                 down_read(&osp->opd_async_updates_rwsem);
547
548                 args = ptlrpc_req_async_args(req);
549                 args->oaua_update = dt_update;
550                 args->oaua_count = &osp->opd_async_updates_count;
551                 args->oaua_waitq = &osp->opd_syn_barrier_waitq;
552                 args->oaua_flow_control = flow_control;
553                 req->rq_interpret_reply =
554                         osp_async_update_interpret;
555
556                 atomic_inc(args->oaua_count);
557                 up_read(&osp->opd_async_updates_rwsem);
558
559                 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
560         } else {
561                 rc = osp_remote_sync(env, osp, dt_update, NULL, true);
562         }
563
564         return rc;
565 }
566
567 /**
568  * Get local thandle for osp_thandle
569  *
570  * Get the local OSD thandle from the OSP thandle. Currently, there
571  * are a few OSP API (osp_object_create() and osp_sync_add()) needs
572  * to update the object on local OSD device.
573  *
574  * If the osp_thandle comes from normal stack (MDD->LOD->OSP), then
575  * we will get local thandle by thandle_get_sub_by_dt.
576  *
577  * If the osp_thandle is remote thandle (th_top == NULL, only used
578  * by LFSCK), then it will create a local thandle, and stop it in
579  * osp_trans_stop(). And this only happens on OSP for OST.
580  *
581  * These are temporary solution, once OSP accessing OSD object is
582  * being fixed properly, this function should be removed. XXX
583  *
584  * \param[in] env               pointer to the thread context
585  * \param[in] th                pointer to the transaction handler
586  * \param[in] dt                pointer to the OSP device
587  *
588  * \retval                      pointer to the local thandle
589  * \retval                      ERR_PTR(errno) if it fails.
590  **/
591 struct thandle *osp_get_storage_thandle(const struct lu_env *env,
592                                         struct thandle *th,
593                                         struct osp_device *osp)
594 {
595         struct osp_thandle      *oth;
596         struct thandle          *local_th;
597
598         if (th->th_top != NULL)
599                 return thandle_get_sub_by_dt(env, th->th_top,
600                                              osp->opd_storage);
601
602         LASSERT(!osp->opd_connect_mdt);
603         oth = thandle_to_osp_thandle(th);
604         if (oth->ot_storage_th != NULL)
605                 return oth->ot_storage_th;
606
607         local_th = dt_trans_create(env, osp->opd_storage);
608         if (IS_ERR(local_th))
609                 return local_th;
610
611         oth->ot_storage_th = local_th;
612
613         return local_th;
614 }
615
616 /**
617  * The OSP layer dt_device_operations::dt_trans_start() interface
618  * to start the transaction.
619  *
620  * If the transaction is a remote transaction, then related remote
621  * updates will be triggered in the osp_trans_stop(); otherwise the
622  * transaction contains both local and remote update(s), then when
623  * the OUT RPC will be triggered depends on the operation, and is
624  * indicated by the dt_device::tu_sent_after_local_trans, for example:
625  *
626  * 1) If it is remote create, it will send the remote req after local
627  * transaction. i.e. create the object locally first, then insert the
628  * remote name entry.
629  *
630  * 2) If it is remote unlink, it will send the remote req before the
631  * local transaction, i.e. delete the name entry remotely first, then
632  * destroy the local object.
633  *
634  * Please refer to osp_trans_create() for transaction type.
635  *
636  * \param[in] env               pointer to the thread context
637  * \param[in] dt                pointer to the OSP dt_device
638  * \param[in] th                pointer to the transaction handler
639  *
640  * \retval                      0 for success
641  * \retval                      negative error number on failure
642  */
643 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
644                     struct thandle *th)
645 {
646         struct osp_thandle       *oth = thandle_to_osp_thandle(th);
647         struct dt_update_request *dt_update;
648         int                      rc = 0;
649
650         dt_update = oth->ot_dur;
651         LASSERT(dt_update != NULL);
652
653         /* return if there are no updates,  */
654         if (dt_update->dur_buf.ub_req == NULL ||
655             dt_update->dur_buf.ub_req->ourq_count == 0)
656                 GOTO(out, rc = 0);
657
658         /* Note: some updates needs to send before local transaction,
659          * some needs to send after local transaction.
660          *
661          * If the transaction only includes remote updates, it will
662          * send updates to remote MDT in osp_trans_stop.
663          *
664          * If it is remote create, it will send the remote req after
665          * local transaction. i.e. create the object locally first,
666          * then insert the name entry.
667          *
668          * If it is remote unlink, it will send the remote req before
669          * the local transaction, i.e. delete the name entry remote
670          * first, then destroy the local object. */
671         if (!is_only_remote_trans(th) &&
672             !oth->ot_send_updates_after_local_trans)
673                 rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update, th,
674                                        false);
675
676 out:
677         /* For remote thandle, if there are local thandle, start it here*/
678         if (th->th_top == NULL && oth->ot_storage_th != NULL)
679                 rc = dt_trans_start(env, oth->ot_storage_th->th_dev,
680                                     oth->ot_storage_th);
681
682         return rc;
683 }
684
685 /**
686  * The OSP layer dt_device_operations::dt_trans_stop() interface
687  * to stop the transaction.
688  *
689  * If the transaction is a remote transaction, or the update handler
690  * is marked as 'tu_sent_after_local_trans', then related remote
691  * updates will be triggered here via osp_trans_trigger().
692  *
693  * For synchronous mode update or any failed update, the request
694  * will be destroyed explicitly when the osp_trans_stop().
695  *
696  * Please refer to osp_trans_create() for transaction type.
697  *
698  * \param[in] env               pointer to the thread context
699  * \param[in] dt                pointer to the OSP dt_device
700  * \param[in] th                pointer to the transaction handler
701  *
702  * \retval                      0 for success
703  * \retval                      negative error number on failure
704  */
705 int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
706                    struct thandle *th)
707 {
708
709         struct osp_thandle       *oth = thandle_to_osp_thandle(th);
710         struct dt_update_request *dt_update;
711         int                      rc = 0;
712         bool                     keep_dt_update = false;
713         ENTRY;
714
715         dt_update = oth->ot_dur;
716         LASSERT(dt_update != NULL);
717         LASSERT(dt_update != LP_POISON);
718
719         /* For remote transaction, if there is local storage thandle,
720          * stop it first */
721         if (oth->ot_storage_th != NULL && th->th_top == NULL) {
722                 dt_trans_stop(env, oth->ot_storage_th->th_dev,
723                               oth->ot_storage_th);
724                 oth->ot_storage_th = NULL;
725         }
726         /* If there are no updates, destroy dt_update and thandle */
727         if (dt_update->dur_buf.ub_req == NULL ||
728             dt_update->dur_buf.ub_req->ourq_count == 0)
729                 GOTO(out, rc);
730
731         if (is_only_remote_trans(th) && !th->th_sync) {
732                 struct osp_device *osp = dt2osp_dev(th->th_dev);
733                 struct client_obd *cli = &osp->opd_obd->u.cli;
734
735                 if (th->th_result != 0) {
736                         rc = th->th_result;
737                         GOTO(out, rc);
738                 }
739
740                 rc = obd_get_request_slot(cli);
741                 if (!osp->opd_imp_active || !osp->opd_imp_connected) {
742                         if (rc == 0)
743                                 obd_put_request_slot(cli);
744                         rc = -ENOTCONN;
745                 }
746                 if (rc != 0)
747                         GOTO(out, rc);
748
749                 rc = osp_trans_trigger(env, dt2osp_dev(dt),
750                                        dt_update, th, true);
751                 if (rc != 0)
752                         obd_put_request_slot(cli);
753                 else
754                         keep_dt_update = true;
755         } else {
756                 if (oth->ot_send_updates_after_local_trans ||
757                    (is_only_remote_trans(th) && th->th_sync))
758                         rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update,
759                                                th, false);
760                 rc = dt_update->dur_rc;
761         }
762
763 out:
764         if (!keep_dt_update)
765                 dt_update_request_destroy(dt_update);
766         OBD_FREE_PTR(oth);
767
768         RETURN(rc);
769 }