Whamcloud - gitweb
d00dbe7cb9084a0c0315637a5316c312167fdc7c
[fs/lustre-release.git] / lustre / osp / osp_trans.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_trans.c
27  *
28  *
29  * 1. OSP (Object Storage Proxy) transaction methods
30  *
31  * Implement OSP layer transaction related interfaces for the dt_device API
32  * dt_device_operations.
33  *
34  *
35  * 2. Handle asynchronous idempotent operations
36  *
37  * The OSP uses OUT (Object Unified Target) RPC to talk with other server
38  * (MDT or OST) for kinds of operations, such as create, unlink, insert,
39  * delete, lookup, set_(x)attr, get_(x)attr, and etc. To reduce the number
40  * of RPCs, we allow multiple operations to be packaged together in single
41  * OUT RPC.
42  *
43  * For the asynchronous idempotent operations, such as get_(x)attr, related
44  * RPCs will be inserted into a osp_device based shared asynchronous request
45  * queue - osp_device::opd_async_requests. When the queue is full, all the
46  * requests in the queue will be packaged into a single OUT RPC and given to
47  * the ptlrpcd daemon (for sending), then the queue is purged and other new
48  * requests can be inserted into it.
49  *
50  * When the asynchronous idempotent operation inserts the request into the
51  * shared queue, it will register an interpreter. When the packaged OUT RPC
52  * is replied (or failed to be sent out), all the registered interpreters
53  * will be called one by one to handle each own result.
54  *
55  *
56  * Author: Di Wang <di.wang@intel.com>
57  * Author: Fan, Yong <fan.yong@intel.com>
58  */
59
60 #define DEBUG_SUBSYSTEM S_MDS
61
62 #include "osp_internal.h"
63
64 struct osp_async_update_args {
65         struct dt_update_request *oaua_update;
66         atomic_t                 *oaua_count;
67         wait_queue_head_t        *oaua_waitq;
68         bool                      oaua_flow_control;
69 };
70
71 struct osp_async_request {
72         /* list in the dt_update_request::dur_cb_items */
73         struct list_head                 oar_list;
74
75         /* The target of the async update request. */
76         struct osp_object               *oar_obj;
77
78         /* The data used by oar_interpreter. */
79         void                            *oar_data;
80
81         /* The interpreter function called after the async request handled. */
82         osp_async_request_interpreter_t  oar_interpreter;
83 };
84
85 /**
86  * Allocate an asynchronous request and initialize it with the given parameters.
87  *
88  * \param[in] obj               pointer to the operation target
89  * \param[in] data              pointer to the data used by the interpreter
90  * \param[in] interpreter       pointer to the interpreter function
91  *
92  * \retval                      pointer to the asychronous request
93  * \retval                      NULL if the allocation failed
94  */
95 static struct osp_async_request *
96 osp_async_request_init(struct osp_object *obj, void *data,
97                        osp_async_request_interpreter_t interpreter)
98 {
99         struct osp_async_request *oar;
100
101         OBD_ALLOC_PTR(oar);
102         if (oar == NULL)
103                 return NULL;
104
105         lu_object_get(osp2lu_obj(obj));
106         INIT_LIST_HEAD(&oar->oar_list);
107         oar->oar_obj = obj;
108         oar->oar_data = data;
109         oar->oar_interpreter = interpreter;
110
111         return oar;
112 }
113
114 /**
115  * Destroy the asychronous request.
116  *
117  * \param[in] env       pointer to the thread context
118  * \param[in] oar       pointer to asychronous request
119  */
120 static void osp_async_request_fini(const struct lu_env *env,
121                                    struct osp_async_request *oar)
122 {
123         LASSERT(list_empty(&oar->oar_list));
124
125         lu_object_put(env, osp2lu_obj(oar->oar_obj));
126         OBD_FREE_PTR(oar);
127 }
128
129 /**
130  * Interpret the packaged OUT RPC results.
131  *
132  * For every packaged sub-request, call its registered interpreter function.
133  * Then destroy the sub-request.
134  *
135  * \param[in] env       pointer to the thread context
136  * \param[in] req       pointer to the RPC
137  * \param[in] arg       pointer to data used by the interpreter
138  * \param[in] rc        the RPC return value
139  *
140  * \retval              0 for success
141  * \retval              negative error number on failure
142  */
143 static int osp_async_update_interpret(const struct lu_env *env,
144                                       struct ptlrpc_request *req,
145                                       void *arg, int rc)
146 {
147         struct object_update_reply      *reply  = NULL;
148         struct osp_async_update_args    *oaua   = arg;
149         struct dt_update_request        *dt_update = oaua->oaua_update;
150         struct osp_async_request        *oar;
151         struct osp_async_request        *next;
152         int                              count  = 0;
153         int                              index  = 0;
154         int                              rc1    = 0;
155
156         if (oaua->oaua_flow_control)
157                 obd_put_request_slot(
158                                 &dt2osp_dev(dt_update->dur_dt)->opd_obd->u.cli);
159
160         /* Unpack the results from the reply message. */
161         if (req->rq_repmsg != NULL) {
162                 reply = req_capsule_server_sized_get(&req->rq_pill,
163                                                      &RMF_OUT_UPDATE_REPLY,
164                                                      OUT_UPDATE_REPLY_SIZE);
165                 if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC)
166                         rc1 = -EPROTO;
167                 else
168                         count = reply->ourp_count;
169         } else {
170                 rc1 = rc;
171         }
172
173         list_for_each_entry_safe(oar, next, &dt_update->dur_cb_items,
174                                  oar_list) {
175                 list_del_init(&oar->oar_list);
176
177                 /* The peer may only have handled some requests (indicated
178                  * by the 'count') in the packaged OUT RPC, we can only get
179                  * results for the handled part. */
180                 if (index < count && reply->ourp_lens[index] > 0) {
181                         struct object_update_result *result;
182
183                         result = object_update_result_get(reply, index, NULL);
184                         if (result == NULL)
185                                 rc1 = -EPROTO;
186                         else
187                                 rc1 = result->our_rc;
188                 } else {
189                         rc1 = rc;
190                         if (unlikely(rc1 == 0))
191                                 rc1 = -EINVAL;
192                 }
193
194                 oar->oar_interpreter(env, reply, req, oar->oar_obj,
195                                        oar->oar_data, index, rc1);
196                 osp_async_request_fini(env, oar);
197                 index++;
198         }
199
200         if (oaua->oaua_count != NULL && atomic_dec_and_test(oaua->oaua_count))
201                 wake_up_all(oaua->oaua_waitq);
202
203         dt_update_request_destroy(dt_update);
204
205         return 0;
206 }
207
208 /**
209  * Pack all the requests in the shared asynchronous idempotent request queue
210  * into a single OUT RPC that will be given to the background ptlrpcd daemon.
211  *
212  * \param[in] env       pointer to the thread context
213  * \param[in] osp       pointer to the OSP device
214  * \param[in] update    pointer to the shared queue
215  *
216  * \retval              0 for success
217  * \retval              negative error number on failure
218  */
219 int osp_unplug_async_request(const struct lu_env *env,
220                              struct osp_device *osp,
221                              struct dt_update_request *update)
222 {
223         struct osp_async_update_args    *args;
224         struct ptlrpc_request           *req = NULL;
225         int                              rc;
226
227         rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
228                                  update->dur_buf.ub_req, &req);
229         if (rc != 0) {
230                 struct osp_async_request *oar;
231                 struct osp_async_request *next;
232
233                 list_for_each_entry_safe(oar, next,
234                                          &update->dur_cb_items, oar_list) {
235                         list_del_init(&oar->oar_list);
236                         oar->oar_interpreter(env, NULL, NULL, oar->oar_obj,
237                                                oar->oar_data, 0, rc);
238                         osp_async_request_fini(env, oar);
239                 }
240                 dt_update_request_destroy(update);
241         } else {
242                 LASSERT(list_empty(&update->dur_list));
243
244                 args = ptlrpc_req_async_args(req);
245                 args->oaua_update = update;
246                 args->oaua_count = NULL;
247                 args->oaua_waitq = NULL;
248                 args->oaua_flow_control = false;
249                 req->rq_interpret_reply = osp_async_update_interpret;
250                 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
251         }
252
253         return rc;
254 }
255
256 /**
257  * Find or create (if NOT exist or purged) the shared asynchronous idempotent
258  * request queue - osp_device::opd_async_requests.
259  *
260  * If the osp_device::opd_async_requests is not NULL, then return it directly;
261  * otherwise create new dt_update_request and attach it to opd_async_requests.
262  *
263  * \param[in] osp       pointer to the OSP device
264  *
265  * \retval              pointer to the shared queue
266  * \retval              negative error number on failure
267  */
268 static struct dt_update_request *
269 osp_find_or_create_async_update_request(struct osp_device *osp)
270 {
271         struct dt_update_request *update = osp->opd_async_requests;
272
273         if (update != NULL)
274                 return update;
275
276         update = dt_update_request_create(&osp->opd_dt_dev);
277         if (!IS_ERR(update))
278                 osp->opd_async_requests = update;
279
280         return update;
281 }
282
283 /**
284  * Insert an asynchronous idempotent request to the shared request queue that
285  * is attached to the osp_device.
286  *
287  * This function generates a new osp_async_request with the given parameters,
288  * then tries to insert the request into the osp_device-based shared request
289  * queue. If the queue is full, then triggers the packaged OUT RPC to purge
290  * the shared queue firstly, and then re-tries.
291  *
292  * NOTE: must hold the osp::opd_async_requests_mutex to serialize concurrent
293  *       osp_insert_async_request call from others.
294  *
295  * \param[in] env               pointer to the thread context
296  * \param[in] op                operation type, see 'enum update_type'
297  * \param[in] obj               pointer to the operation target
298  * \param[in] count             array size of the subsequent \a lens and \a bufs
299  * \param[in] lens              buffer length array for the subsequent \a bufs
300  * \param[in] bufs              the buffers to compose the request
301  * \param[in] data              pointer to the data used by the interpreter
302  * \param[in] interpreter       pointer to the interpreter function
303  *
304  * \retval                      0 for success
305  * \retval                      negative error number on failure
306  */
307 int osp_insert_async_request(const struct lu_env *env, enum update_type op,
308                              struct osp_object *obj, int count,
309                              __u16 *lens, const void **bufs, void *data,
310                              osp_async_request_interpreter_t interpreter)
311 {
312         struct osp_async_request     *oar;
313         struct osp_device            *osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev);
314         struct dt_update_request     *update;
315         int                           rc  = 0;
316         ENTRY;
317
318         oar = osp_async_request_init(obj, data, interpreter);
319         if (oar == NULL)
320                 RETURN(-ENOMEM);
321
322         update = osp_find_or_create_async_update_request(osp);
323         if (IS_ERR(update))
324                 GOTO(out, rc = PTR_ERR(update));
325
326 again:
327         /* The queue is full. */
328         rc = out_update_pack(env, &update->dur_buf, op,
329                              lu_object_fid(osp2lu_obj(obj)), count, lens, bufs,
330                              0);
331         if (rc == -E2BIG) {
332                 osp->opd_async_requests = NULL;
333                 mutex_unlock(&osp->opd_async_requests_mutex);
334
335                 rc = osp_unplug_async_request(env, osp, update);
336                 mutex_lock(&osp->opd_async_requests_mutex);
337                 if (rc != 0)
338                         GOTO(out, rc);
339
340                 update = osp_find_or_create_async_update_request(osp);
341                 if (IS_ERR(update))
342                         GOTO(out, rc = PTR_ERR(update));
343
344                 goto again;
345         }
346
347         if (rc == 0)
348                 list_add_tail(&oar->oar_list, &update->dur_cb_items);
349
350         GOTO(out, rc);
351
352 out:
353         if (rc != 0)
354                 osp_async_request_fini(env, oar);
355
356         return rc;
357 }
358
359 /**
360  * The OSP layer dt_device_operations::dt_trans_create() interface
361  * to create a transaction.
362  *
363  * There are two kinds of transactions that will involve OSP:
364  *
365  * 1) If the transaction only contains the updates on remote server
366  *    (MDT or OST), such as re-generating the lost OST-object for
367  *    LFSCK, then it is a remote transaction. For remote transaction,
368  *    the upper layer caller (such as the LFSCK engine) will call the
369  *    dt_trans_create() (with the OSP dt_device as the parameter),
370  *    then the call will be directed to the osp_trans_create() that
371  *    creates the transaction handler and returns it to the caller.
372  *
373  * 2) If the transcation contains both local and remote updates,
374  *    such as cross MDTs create under DNE mode, then the upper layer
375  *    caller will not trigger osp_trans_create(). Instead, it will
376  *    call dt_trans_create() on other dt_device, such as LOD that
377  *    will generate the transaction handler. Such handler will be
378  *    used by the whole transaction in subsequent sub-operations.
379  *
380  * \param[in] env       pointer to the thread context
381  * \param[in] d         pointer to the OSP dt_device
382  *
383  * \retval              pointer to the transaction handler
384  * \retval              negative error number on failure
385  */
386 struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
387 {
388         struct thandle          *th = NULL;
389         struct thandle_update   *tu = NULL;
390         int                      rc = 0;
391
392         OBD_ALLOC_PTR(th);
393         if (unlikely(th == NULL))
394                 GOTO(out, rc = -ENOMEM);
395
396         th->th_dev = d;
397         th->th_tags = LCT_TX_HANDLE;
398         atomic_set(&th->th_refc, 1);
399         th->th_alloc_size = sizeof(*th);
400
401         OBD_ALLOC_PTR(tu);
402         if (tu == NULL)
403                 GOTO(out, rc = -ENOMEM);
404
405         INIT_LIST_HEAD(&tu->tu_remote_update_list);
406         tu->tu_only_remote_trans = 1;
407         th->th_update = tu;
408
409 out:
410         if (rc != 0) {
411                 if (tu != NULL)
412                         OBD_FREE_PTR(tu);
413                 if (th != NULL)
414                         OBD_FREE_PTR(th);
415                 th = ERR_PTR(rc);
416         }
417
418         return th;
419 }
420
421 /**
422  * Prepare update request.
423  *
424  * Prepare OUT update ptlrpc request, and the request usually includes
425  * all of updates (stored in \param ureq) from one operation.
426  *
427  * \param[in] env       execution environment
428  * \param[in] imp       import on which ptlrpc request will be sent
429  * \param[in] ureq      hold all of updates which will be packed into the req
430  * \param[in] reqp      request to be created
431  *
432  * \retval              0 if preparation succeeds.
433  * \retval              negative errno if preparation fails.
434  */
435 int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
436                         const struct object_update_request *ureq,
437                         struct ptlrpc_request **reqp)
438 {
439         struct ptlrpc_request           *req;
440         struct object_update_request    *tmp;
441         int                             ureq_len;
442         int                             rc;
443         ENTRY;
444
445         req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
446         if (req == NULL)
447                 RETURN(-ENOMEM);
448
449         ureq_len = object_update_request_size(ureq);
450         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE, RCL_CLIENT,
451                              ureq_len);
452
453         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
454         if (rc != 0) {
455                 ptlrpc_req_finished(req);
456                 RETURN(rc);
457         }
458
459         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
460                              RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
461
462         tmp = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE);
463         memcpy(tmp, ureq, ureq_len);
464
465         ptlrpc_request_set_replen(req);
466         req->rq_request_portal = OUT_PORTAL;
467         req->rq_reply_portal = OSC_REPLY_PORTAL;
468         *reqp = req;
469
470         RETURN(rc);
471 }
472
473 /**
474  * Send update RPC.
475  *
476  * Send update request to the remote MDT synchronously.
477  *
478  * \param[in] env       execution environment
479  * \param[in] imp       import on which ptlrpc request will be sent
480  * \param[in] dt_update hold all of updates which will be packed into the req
481  * \param[in] reqp      request to be created
482  *
483  * \retval              0 if RPC succeeds.
484  * \retval              negative errno if RPC fails.
485  */
486 int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
487                     struct dt_update_request *dt_update,
488                     struct ptlrpc_request **reqp, bool rpc_lock)
489 {
490         struct obd_import       *imp = osp->opd_obd->u.cli.cl_import;
491         struct ptlrpc_request   *req = NULL;
492         int                     rc;
493         ENTRY;
494
495         rc = osp_prep_update_req(env, imp, dt_update->dur_buf.ub_req, &req);
496         if (rc != 0)
497                 RETURN(rc);
498
499         /* Note: some dt index api might return non-zero result here, like
500          * osd_index_ea_lookup, so we should only check rc < 0 here */
501         if (rpc_lock)
502                 osp_get_rpc_lock(osp);
503         rc = ptlrpc_queue_wait(req);
504         if (rpc_lock)
505                 osp_put_rpc_lock(osp);
506         if (rc < 0) {
507                 ptlrpc_req_finished(req);
508                 dt_update->dur_rc = rc;
509                 RETURN(rc);
510         }
511
512         if (reqp != NULL) {
513                 *reqp = req;
514                 RETURN(rc);
515         }
516
517         dt_update->dur_rc = rc;
518
519         ptlrpc_req_finished(req);
520
521         RETURN(rc);
522 }
523
524 /**
525  * Trigger the request for remote updates.
526  *
527  * If the transaction is not a remote one or it is required to be sync mode
528  * (th->th_sync is set), then it will be sent synchronously; otherwise, the
529  * RPC will be sent asynchronously.
530  *
531  * Please refer to osp_trans_create() for transaction type.
532  *
533  * \param[in] env               pointer to the thread context
534  * \param[in] osp               pointer to the OSP device
535  * \param[in] dt_update         pointer to the dt_update_request
536  * \param[in] th                pointer to the transaction handler
537  * \param[in] flow_control      whether need to control the flow
538  *
539  * \retval                      0 for success
540  * \retval                      negative error number on failure
541  */
542 static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
543                              struct dt_update_request *dt_update,
544                              struct thandle *th, bool flow_control)
545 {
546         struct thandle_update   *tu = th->th_update;
547         int                      rc = 0;
548
549         LASSERT(tu != NULL);
550
551         if (is_only_remote_trans(th)) {
552                 struct osp_async_update_args    *args;
553                 struct ptlrpc_request           *req;
554
555                 list_del_init(&dt_update->dur_list);
556                 if (th->th_sync) {
557                         rc = osp_remote_sync(env, osp, dt_update, NULL, true);
558                         dt_update_request_destroy(dt_update);
559
560                         return rc;
561                 }
562
563                 rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
564                                          dt_update->dur_buf.ub_req, &req);
565                 if (rc == 0) {
566                         down_read(&osp->opd_async_updates_rwsem);
567
568                         args = ptlrpc_req_async_args(req);
569                         args->oaua_update = dt_update;
570                         args->oaua_count = &osp->opd_async_updates_count;
571                         args->oaua_waitq = &osp->opd_syn_barrier_waitq;
572                         args->oaua_flow_control = flow_control;
573                         req->rq_interpret_reply =
574                                 osp_async_update_interpret;
575
576                         atomic_inc(args->oaua_count);
577                         up_read(&osp->opd_async_updates_rwsem);
578
579                         ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
580                 } else {
581                         dt_update_request_destroy(dt_update);
582                 }
583         } else {
584                 th->th_sync = 1;
585                 rc = osp_remote_sync(env, osp, dt_update, NULL, true);
586         }
587
588         return rc;
589 }
590
591 /**
592  * The OSP layer dt_device_operations::dt_trans_start() interface
593  * to start the transaction.
594  *
595  * If the transaction is a remote transaction, then related remote
596  * updates will be triggered in the osp_trans_stop(); otherwise the
597  * transaction contains both local and remote update(s), then when
598  * the OUT RPC will be triggered depends on the operation, and is
599  * indicated by the dt_device::tu_sent_after_local_trans, for example:
600  *
601  * 1) If it is remote create, it will send the remote req after local
602  * transaction. i.e. create the object locally first, then insert the
603  * remote name entry.
604  *
605  * 2) If it is remote unlink, it will send the remote req before the
606  * local transaction, i.e. delete the name entry remotely first, then
607  * destroy the local object.
608  *
609  * Please refer to osp_trans_create() for transaction type.
610  *
611  * \param[in] env               pointer to the thread context
612  * \param[in] dt                pointer to the OSP dt_device
613  * \param[in] th                pointer to the transaction handler
614  *
615  * \retval                      0 for success
616  * \retval                      negative error number on failure
617  */
618 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
619                     struct thandle *th)
620 {
621         struct thandle_update           *tu = th->th_update;
622         struct dt_update_request        *dt_update;
623         int                              rc = 0;
624
625         if (tu == NULL)
626                 return rc;
627
628         /* Check whether there are updates related with this OSP */
629         dt_update = out_find_update(tu, dt);
630         if (dt_update == NULL)
631                 return rc;
632
633         if (!is_only_remote_trans(th) && !tu->tu_sent_after_local_trans)
634                 rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update, th,
635                                        false);
636
637         return rc;
638 }
639
640 /**
641  * The OSP layer dt_device_operations::dt_trans_stop() interface
642  * to stop the transaction.
643  *
644  * If the transaction is a remote transaction, or the update handler
645  * is marked as 'tu_sent_after_local_trans', then related remote
646  * updates will be triggered here via osp_trans_trigger().
647  *
648  * For synchronous mode update or any failed update, the request
649  * will be destroyed explicitly when the osp_trans_stop().
650  *
651  * Please refer to osp_trans_create() for transaction type.
652  *
653  * \param[in] env               pointer to the thread context
654  * \param[in] dt                pointer to the OSP dt_device
655  * \param[in] th                pointer to the transaction handler
656  *
657  * \retval                      0 for success
658  * \retval                      negative error number on failure
659  */
660 int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
661                    struct thandle *th)
662 {
663         struct thandle_update           *tu = th->th_update;
664         struct dt_update_request        *dt_update;
665         int                              rc = 0;
666         ENTRY;
667
668         LASSERT(tu != NULL);
669         LASSERT(tu != LP_POISON);
670
671         /* Check whether there are updates related with this OSP */
672         dt_update = out_find_update(tu, dt);
673         if (dt_update == NULL) {
674                 if (!is_only_remote_trans(th))
675                         RETURN(rc);
676
677                 GOTO(put, rc);
678         }
679
680         if (dt_update->dur_buf.ub_req == NULL ||
681             dt_update->dur_buf.ub_req->ourq_count == 0) {
682                 dt_update_request_destroy(dt_update);
683                 GOTO(put, rc);
684         }
685
686         if (is_only_remote_trans(th)) {
687                 if (th->th_result == 0) {
688                         struct osp_device *osp = dt2osp_dev(th->th_dev);
689                         struct client_obd *cli = &osp->opd_obd->u.cli;
690
691                         rc = obd_get_request_slot(cli);
692                         if (!osp->opd_imp_active || !osp->opd_imp_connected) {
693                                 if (rc == 0)
694                                         obd_put_request_slot(cli);
695
696                                 rc = -ENOTCONN;
697                         }
698
699                         if (rc != 0) {
700                                 dt_update_request_destroy(dt_update);
701                                 GOTO(put, rc);
702                         }
703
704                         rc = osp_trans_trigger(env, dt2osp_dev(dt),
705                                                dt_update, th, true);
706                         if (rc != 0)
707                                 obd_put_request_slot(cli);
708                 } else {
709                         rc = th->th_result;
710                         dt_update_request_destroy(dt_update);
711                 }
712         } else {
713                 if (tu->tu_sent_after_local_trans)
714                         rc = osp_trans_trigger(env, dt2osp_dev(dt),
715                                                dt_update, th, false);
716                 rc = dt_update->dur_rc;
717                 dt_update_request_destroy(dt_update);
718         }
719
720         GOTO(put, rc);
721
722 put:
723         thandle_put(th);
724         return rc;
725 }