Whamcloud - gitweb
LU-5710 all: second batch of corrected typos and grammar errors
[fs/lustre-release.git] / lustre / osp / osp_trans.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_trans.c
27  *
28  *
29  * 1. OSP (Object Storage Proxy) transaction methods
30  *
31  * Implement OSP layer transaction related interfaces for the dt_device API
32  * dt_device_operations.
33  *
34  *
35  * 2. Handle asynchronous idempotent operations
36  *
37  * The OSP uses OUT (Object Unified Target) RPC to talk with other server
38  * (MDT or OST) for kinds of operations, such as create, unlink, insert,
39  * delete, lookup, set_(x)attr, get_(x)attr, and etc. To reduce the number
40  * of RPCs, we allow multiple operations to be packaged together in single
41  * OUT RPC.
42  *
43  * For the asynchronous idempotent operations, such as get_(x)attr, related
44  * RPCs will be inserted into an osp_device based shared asynchronous request
45  * queue - osp_device::opd_async_requests. When the queue is full, all the
46  * requests in the queue will be packaged into a single OUT RPC and given to
47  * the ptlrpcd daemon (for sending), then the queue is purged and other new
48  * requests can be inserted into it.
49  *
50  * When the asynchronous idempotent operation inserts the request into the
51  * shared queue, it will register an interpreter. When the packaged OUT RPC
52  * is replied (or failed to be sent out), all the registered interpreters
53  * will be called one by one to handle each own result.
54  *
55  *
56  * There are three kinds of transactions
57  *
58  * 1. Local transaction, all of updates of the transaction are in the local MDT.
59  * 2. Remote transaction, all of updates of the transaction are in one remote
60  * MDT, which only happens in LFSCK now.
61  * 3. Distribute transaction, updates for the transaction are in mulitple MDTs.
62  *
63  * Author: Di Wang <di.wang@intel.com>
64  * Author: Fan, Yong <fan.yong@intel.com>
65  */
66
67 #define DEBUG_SUBSYSTEM S_MDS
68
69 #include "osp_internal.h"
70
71 /**
72  * The argument for the interpreter callback of osp request.
73  */
74 struct osp_update_args {
75         struct dt_update_request *oaua_update;
76         atomic_t                 *oaua_count;
77         wait_queue_head_t        *oaua_waitq;
78         bool                      oaua_flow_control;
79 };
80
81 /**
82  * Call back for each update request.
83  */
84 struct osp_update_callback {
85         /* list in the dt_update_request::dur_cb_items */
86         struct list_head                 ouc_list;
87
88         /* The target of the async update request. */
89         struct osp_object               *ouc_obj;
90
91         /* The data used by or_interpreter. */
92         void                            *ouc_data;
93
94         /* The interpreter function called after the async request handled. */
95         osp_update_interpreter_t        ouc_interpreter;
96 };
97
98 static struct object_update_request *object_update_request_alloc(size_t size)
99 {
100         struct object_update_request *ourq;
101
102         OBD_ALLOC_LARGE(ourq, size);
103         if (ourq == NULL)
104                 return ERR_PTR(-ENOMEM);
105
106         ourq->ourq_magic = UPDATE_REQUEST_MAGIC;
107         ourq->ourq_count = 0;
108
109         return ourq;
110 }
111
112 static void object_update_request_free(struct object_update_request *ourq,
113                                        size_t ourq_size)
114 {
115         if (ourq != NULL)
116                 OBD_FREE_LARGE(ourq, ourq_size);
117 }
118
119 /**
120  * Allocate and initialize dt_update_request
121  *
122  * dt_update_request is being used to track updates being executed on
123  * this dt_device(OSD or OSP). The update buffer will be 4k initially,
124  * and increased if needed.
125  *
126  * \param [in] dt       dt device
127  *
128  * \retval              dt_update_request being allocated if succeed
129  * \retval              ERR_PTR(errno) if failed
130  */
131 struct dt_update_request *dt_update_request_create(struct dt_device *dt)
132 {
133         struct dt_update_request *dt_update;
134         struct object_update_request *ourq;
135
136         OBD_ALLOC_PTR(dt_update);
137         if (dt_update == NULL)
138                 return ERR_PTR(-ENOMEM);
139
140         ourq = object_update_request_alloc(OUT_UPDATE_INIT_BUFFER_SIZE);
141         if (IS_ERR(ourq)) {
142                 OBD_FREE_PTR(dt_update);
143                 return ERR_CAST(ourq);
144         }
145
146         dt_update->dur_buf.ub_req = ourq;
147         dt_update->dur_buf.ub_req_size = OUT_UPDATE_INIT_BUFFER_SIZE;
148
149         dt_update->dur_dt = dt;
150         dt_update->dur_batchid = 0;
151         INIT_LIST_HEAD(&dt_update->dur_cb_items);
152
153         return dt_update;
154 }
155
156 /**
157  * Destroy dt_update_request
158  *
159  * \param [in] dt_update        dt_update_request being destroyed
160  */
161 void dt_update_request_destroy(struct dt_update_request *dt_update)
162 {
163         if (dt_update == NULL)
164                 return;
165
166         object_update_request_free(dt_update->dur_buf.ub_req,
167                                    dt_update->dur_buf.ub_req_size);
168         OBD_FREE_PTR(dt_update);
169 }
170
171 /**
172  * Allocate an osp request and initialize it with the given parameters.
173  *
174  * \param[in] obj               pointer to the operation target
175  * \param[in] data              pointer to the data used by the interpreter
176  * \param[in] interpreter       pointer to the interpreter function
177  *
178  * \retval                      pointer to the asychronous request
179  * \retval                      NULL if the allocation failed
180  */
181 static struct osp_update_callback *
182 osp_update_callback_init(struct osp_object *obj, void *data,
183                          osp_update_interpreter_t interpreter)
184 {
185         struct osp_update_callback *ouc;
186
187         OBD_ALLOC_PTR(ouc);
188         if (ouc == NULL)
189                 return NULL;
190
191         lu_object_get(osp2lu_obj(obj));
192         INIT_LIST_HEAD(&ouc->ouc_list);
193         ouc->ouc_obj = obj;
194         ouc->ouc_data = data;
195         ouc->ouc_interpreter = interpreter;
196
197         return ouc;
198 }
199
200 /**
201  * Destroy the osp_update_callback.
202  *
203  * \param[in] env       pointer to the thread context
204  * \param[in] ouc       pointer to osp_update_callback
205  */
206 static void osp_update_callback_fini(const struct lu_env *env,
207                                      struct osp_update_callback *ouc)
208 {
209         LASSERT(list_empty(&ouc->ouc_list));
210
211         lu_object_put(env, osp2lu_obj(ouc->ouc_obj));
212         OBD_FREE_PTR(ouc);
213 }
214
215 /**
216  * Interpret the packaged OUT RPC results.
217  *
218  * For every packaged sub-request, call its registered interpreter function.
219  * Then destroy the sub-request.
220  *
221  * \param[in] env       pointer to the thread context
222  * \param[in] req       pointer to the RPC
223  * \param[in] arg       pointer to data used by the interpreter
224  * \param[in] rc        the RPC return value
225  *
226  * \retval              0 for success
227  * \retval              negative error number on failure
228  */
229 static int osp_update_interpret(const struct lu_env *env,
230                                 struct ptlrpc_request *req, void *arg, int rc)
231 {
232         struct object_update_reply      *reply  = NULL;
233         struct osp_update_args          *oaua   = arg;
234         struct dt_update_request        *dt_update = oaua->oaua_update;
235         struct osp_update_callback      *ouc;
236         struct osp_update_callback      *next;
237         int                              count  = 0;
238         int                              index  = 0;
239         int                              rc1    = 0;
240
241         if (oaua->oaua_flow_control)
242                 obd_put_request_slot(
243                                 &dt2osp_dev(dt_update->dur_dt)->opd_obd->u.cli);
244
245         /* Unpack the results from the reply message. */
246         if (req->rq_repmsg != NULL) {
247                 reply = req_capsule_server_sized_get(&req->rq_pill,
248                                                      &RMF_OUT_UPDATE_REPLY,
249                                                      OUT_UPDATE_REPLY_SIZE);
250                 if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC)
251                         rc1 = -EPROTO;
252                 else
253                         count = reply->ourp_count;
254         } else {
255                 rc1 = rc;
256         }
257
258         list_for_each_entry_safe(ouc, next, &dt_update->dur_cb_items,
259                                  ouc_list) {
260                 list_del_init(&ouc->ouc_list);
261
262                 /* The peer may only have handled some requests (indicated
263                  * by the 'count') in the packaged OUT RPC, we can only get
264                  * results for the handled part. */
265                 if (index < count && reply->ourp_lens[index] > 0) {
266                         struct object_update_result *result;
267
268                         result = object_update_result_get(reply, index, NULL);
269                         if (result == NULL)
270                                 rc1 = -EPROTO;
271                         else
272                                 rc1 = result->our_rc;
273                 } else {
274                         rc1 = rc;
275                         if (unlikely(rc1 == 0))
276                                 rc1 = -EINVAL;
277                 }
278
279                 if (ouc->ouc_interpreter != NULL)
280                         ouc->ouc_interpreter(env, reply, req, ouc->ouc_obj,
281                                              ouc->ouc_data, index, rc1);
282
283                 osp_update_callback_fini(env, ouc);
284                 index++;
285         }
286
287         if (oaua->oaua_count != NULL && atomic_dec_and_test(oaua->oaua_count))
288                 wake_up_all(oaua->oaua_waitq);
289
290         dt_update_request_destroy(dt_update);
291
292         return 0;
293 }
294
295 /**
296  * Pack all the requests in the shared asynchronous idempotent request queue
297  * into a single OUT RPC that will be given to the background ptlrpcd daemon.
298  *
299  * \param[in] env       pointer to the thread context
300  * \param[in] osp       pointer to the OSP device
301  * \param[in] update    pointer to the shared queue
302  *
303  * \retval              0 for success
304  * \retval              negative error number on failure
305  */
306 int osp_unplug_async_request(const struct lu_env *env,
307                              struct osp_device *osp,
308                              struct dt_update_request *update)
309 {
310         struct osp_update_args  *args;
311         struct ptlrpc_request   *req = NULL;
312         int                      rc;
313
314         rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
315                                  update->dur_buf.ub_req, &req);
316         if (rc != 0) {
317                 struct osp_update_callback *ouc;
318                 struct osp_update_callback *next;
319
320                 list_for_each_entry_safe(ouc, next,
321                                          &update->dur_cb_items, ouc_list) {
322                         list_del_init(&ouc->ouc_list);
323                         if (ouc->ouc_interpreter != NULL)
324                                 ouc->ouc_interpreter(env, NULL, NULL,
325                                                      ouc->ouc_obj,
326                                                      ouc->ouc_data, 0, rc);
327                         osp_update_callback_fini(env, ouc);
328                 }
329                 dt_update_request_destroy(update);
330         } else {
331                 args = ptlrpc_req_async_args(req);
332                 args->oaua_update = update;
333                 args->oaua_count = NULL;
334                 args->oaua_waitq = NULL;
335                 args->oaua_flow_control = false;
336                 req->rq_interpret_reply = osp_update_interpret;
337                 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
338         }
339
340         return rc;
341 }
342
343 /**
344  * Find or create (if NOT exist or purged) the shared asynchronous idempotent
345  * request queue - osp_device::opd_async_requests.
346  *
347  * If the osp_device::opd_async_requests is not NULL, then return it directly;
348  * otherwise create new dt_update_request and attach it to opd_async_requests.
349  *
350  * \param[in] osp       pointer to the OSP device
351  *
352  * \retval              pointer to the shared queue
353  * \retval              negative error number on failure
354  */
355 static struct dt_update_request *
356 osp_find_or_create_async_update_request(struct osp_device *osp)
357 {
358         struct dt_update_request *update = osp->opd_async_requests;
359
360         if (update != NULL)
361                 return update;
362
363         update = dt_update_request_create(&osp->opd_dt_dev);
364         if (!IS_ERR(update))
365                 osp->opd_async_requests = update;
366
367         return update;
368 }
369
370 /**
371  * Insert an osp_update_callback into the dt_update_request.
372  *
373  * Insert an osp_update_callback to the dt_update_request. Usually each update
374  * in the dt_update_request will have one correspondent callback, and these
375  * callbacks will be called in rq_interpret_reply.
376  *
377  * \param[in] env               pointer to the thread context
378  * \param[in] obj               pointer to the operation target object
379  * \param[in] data              pointer to the data used by the interpreter
380  * \param[in] interpreter       pointer to the interpreter function
381  *
382  * \retval                      0 for success
383  * \retval                      negative error number on failure
384  */
385 int osp_insert_update_callback(const struct lu_env *env,
386                                struct dt_update_request *update,
387                                struct osp_object *obj, void *data,
388                                osp_update_interpreter_t interpreter)
389 {
390         struct osp_update_callback  *ouc;
391
392         ouc = osp_update_callback_init(obj, data, interpreter);
393         if (ouc == NULL)
394                 RETURN(-ENOMEM);
395
396         list_add_tail(&ouc->ouc_list, &update->dur_cb_items);
397
398         return 0;
399 }
400
401 /**
402  * Insert an asynchronous idempotent request to the shared request queue that
403  * is attached to the osp_device.
404  *
405  * This function generates a new osp_async_request with the given parameters,
406  * then tries to insert the request into the osp_device-based shared request
407  * queue. If the queue is full, then triggers the packaged OUT RPC to purge
408  * the shared queue firstly, and then re-tries.
409  *
410  * NOTE: must hold the osp::opd_async_requests_mutex to serialize concurrent
411  *       osp_insert_async_request call from others.
412  *
413  * \param[in] env               pointer to the thread context
414  * \param[in] op                operation type, see 'enum update_type'
415  * \param[in] obj               pointer to the operation target
416  * \param[in] count             array size of the subsequent \a lens and \a bufs
417  * \param[in] lens              buffer length array for the subsequent \a bufs
418  * \param[in] bufs              the buffers to compose the request
419  * \param[in] data              pointer to the data used by the interpreter
420  * \param[in] interpreter       pointer to the interpreter function
421  *
422  * \retval                      0 for success
423  * \retval                      negative error number on failure
424  */
425 int osp_insert_async_request(const struct lu_env *env, enum update_type op,
426                              struct osp_object *obj, int count,
427                              __u16 *lens, const void **bufs, void *data,
428                              osp_update_interpreter_t interpreter)
429 {
430         struct osp_device            *osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev);
431         struct dt_update_request     *update;
432         int                           rc  = 0;
433         ENTRY;
434
435         update = osp_find_or_create_async_update_request(osp);
436         if (IS_ERR(update))
437                 RETURN(PTR_ERR(update));
438
439 again:
440         /* The queue is full. */
441         rc = out_update_pack(env, &update->dur_buf, op,
442                              lu_object_fid(osp2lu_obj(obj)), count, lens, bufs,
443                              0);
444         if (rc == -E2BIG) {
445                 osp->opd_async_requests = NULL;
446                 mutex_unlock(&osp->opd_async_requests_mutex);
447
448                 rc = osp_unplug_async_request(env, osp, update);
449                 mutex_lock(&osp->opd_async_requests_mutex);
450                 if (rc != 0)
451                         RETURN(rc);
452
453                 update = osp_find_or_create_async_update_request(osp);
454                 if (IS_ERR(update))
455                         RETURN(PTR_ERR(update));
456
457                 goto again;
458         }
459
460         rc = osp_insert_update_callback(env, update, obj, data, interpreter);
461
462         RETURN(rc);
463 }
464
465 int osp_trans_update_request_create(struct thandle *th)
466 {
467         struct osp_thandle              *oth = thandle_to_osp_thandle(th);
468         struct dt_update_request        *update;
469
470         if (oth->ot_dur != NULL)
471                 return 0;
472
473         update = dt_update_request_create(th->th_dev);
474         if (IS_ERR(update)) {
475                 th->th_result = PTR_ERR(update);
476                 return PTR_ERR(update);
477         }
478
479         oth->ot_dur = update;
480         return 0;
481 }
482 /**
483  * The OSP layer dt_device_operations::dt_trans_create() interface
484  * to create a transaction.
485  *
486  * There are two kinds of transactions that will involve OSP:
487  *
488  * 1) If the transaction only contains the updates on remote server
489  *    (MDT or OST), such as re-generating the lost OST-object for
490  *    LFSCK, then it is a remote transaction. For remote transaction,
491  *    the upper layer caller (such as the LFSCK engine) will call the
492  *    dt_trans_create() (with the OSP dt_device as the parameter),
493  *    then the call will be directed to the osp_trans_create() that
494  *    creates the transaction handler and returns it to the caller.
495  *
496  * 2) If the transcation contains both local and remote updates,
497  *    such as cross MDTs create under DNE mode, then the upper layer
498  *    caller will not trigger osp_trans_create(). Instead, it will
499  *    call dt_trans_create() on other dt_device, such as LOD that
500  *    will generate the transaction handler. Such handler will be
501  *    used by the whole transaction in subsequent sub-operations.
502  *
503  * \param[in] env       pointer to the thread context
504  * \param[in] d         pointer to the OSP dt_device
505  *
506  * \retval              pointer to the transaction handler
507  * \retval              negative error number on failure
508  */
509 struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
510 {
511         struct osp_thandle              *oth;
512         struct thandle                  *th = NULL;
513         ENTRY;
514
515         OBD_ALLOC_PTR(oth);
516         if (unlikely(oth == NULL))
517                 RETURN(ERR_PTR(-ENOMEM));
518
519         th = &oth->ot_super;
520         th->th_dev = d;
521         th->th_tags = LCT_TX_HANDLE;
522
523         RETURN(th);
524 }
525
526 /**
527  * Prepare update request.
528  *
529  * Prepare OUT update ptlrpc request, and the request usually includes
530  * all of updates (stored in \param ureq) from one operation.
531  *
532  * \param[in] env       execution environment
533  * \param[in] imp       import on which ptlrpc request will be sent
534  * \param[in] ureq      hold all of updates which will be packed into the req
535  * \param[in] reqp      request to be created
536  *
537  * \retval              0 if preparation succeeds.
538  * \retval              negative errno if preparation fails.
539  */
540 int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
541                         const struct object_update_request *ureq,
542                         struct ptlrpc_request **reqp)
543 {
544         struct ptlrpc_request           *req;
545         struct object_update_request    *tmp;
546         int                             ureq_len;
547         int                             rc;
548         ENTRY;
549
550         req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
551         if (req == NULL)
552                 RETURN(-ENOMEM);
553
554         ureq_len = object_update_request_size(ureq);
555         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE, RCL_CLIENT,
556                              ureq_len);
557
558         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
559         if (rc != 0) {
560                 ptlrpc_req_finished(req);
561                 RETURN(rc);
562         }
563
564         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
565                              RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
566
567         tmp = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE);
568         memcpy(tmp, ureq, ureq_len);
569
570         ptlrpc_request_set_replen(req);
571         req->rq_request_portal = OUT_PORTAL;
572         req->rq_reply_portal = OSC_REPLY_PORTAL;
573         *reqp = req;
574
575         RETURN(rc);
576 }
577
578 /**
579  * Send update RPC.
580  *
581  * Send update request to the remote MDT synchronously.
582  *
583  * \param[in] env       execution environment
584  * \param[in] imp       import on which ptlrpc request will be sent
585  * \param[in] dt_update hold all of updates which will be packed into the req
586  * \param[in] reqp      request to be created
587  *
588  * \retval              0 if RPC succeeds.
589  * \retval              negative errno if RPC fails.
590  */
591 int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
592                     struct dt_update_request *dt_update,
593                     struct ptlrpc_request **reqp)
594 {
595         struct obd_import       *imp = osp->opd_obd->u.cli.cl_import;
596         struct ptlrpc_request   *req = NULL;
597         int                     rc;
598         ENTRY;
599
600         rc = osp_prep_update_req(env, imp, dt_update->dur_buf.ub_req, &req);
601         if (rc != 0)
602                 RETURN(rc);
603
604         /* Note: some dt index api might return non-zero result here, like
605          * osd_index_ea_lookup, so we should only check rc < 0 here */
606         rc = ptlrpc_queue_wait(req);
607         if (rc < 0) {
608                 ptlrpc_req_finished(req);
609                 dt_update->dur_rc = rc;
610                 RETURN(rc);
611         }
612
613         if (reqp != NULL) {
614                 *reqp = req;
615                 RETURN(rc);
616         }
617
618         dt_update->dur_rc = rc;
619
620         ptlrpc_req_finished(req);
621
622         RETURN(rc);
623 }
624
625 /**
626  * Trigger the request for remote updates.
627  *
628  * If th_sync is set, then the request will be sent synchronously,
629  * otherwise, the RPC will be sent asynchronously.
630  *
631  * Please refer to osp_trans_create() for transaction type.
632  *
633  * \param[in] env               pointer to the thread context
634  * \param[in] osp               pointer to the OSP device
635  * \param[in] dt_update         pointer to the dt_update_request
636  * \param[in] th                pointer to the transaction handler
637  * \param[out] sent             whether the RPC has been sent
638  *
639  * \retval                      0 for success
640  * \retval                      negative error number on failure
641  */
642 static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
643                              struct dt_update_request *dt_update,
644                              struct thandle *th, int *sent)
645 {
646         struct osp_update_args  *args;
647         struct ptlrpc_request   *req;
648         int     rc = 0;
649         ENTRY;
650
651         rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
652                                  dt_update->dur_buf.ub_req, &req);
653         if (rc != 0)
654                 RETURN(rc);
655
656         *sent = 1;
657         req->rq_interpret_reply = osp_update_interpret;
658         args = ptlrpc_req_async_args(req);
659         args->oaua_update = dt_update;
660         if (is_only_remote_trans(th) && !th->th_sync) {
661                 args->oaua_flow_control = true;
662
663                 if (!osp->opd_connect_mdt) {
664                         down_read(&osp->opd_async_updates_rwsem);
665                         args->oaua_count = &osp->opd_async_updates_count;
666                         args->oaua_waitq = &osp->opd_syn_barrier_waitq;
667                         up_read(&osp->opd_async_updates_rwsem);
668                         atomic_inc(args->oaua_count);
669                 }
670
671                 ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
672         } else {
673                 osp_get_rpc_lock(osp);
674                 args->oaua_flow_control = false;
675                 rc = ptlrpc_queue_wait(req);
676                 osp_put_rpc_lock(osp);
677                 ptlrpc_req_finished(req);
678         }
679
680         RETURN(rc);
681 }
682
683 /**
684  * Get local thandle for osp_thandle
685  *
686  * Get the local OSD thandle from the OSP thandle. Currently, there
687  * are a few OSP API (osp_object_create() and osp_sync_add()) needs
688  * to update the object on local OSD device.
689  *
690  * If the osp_thandle comes from normal stack (MDD->LOD->OSP), then
691  * we will get local thandle by thandle_get_sub_by_dt.
692  *
693  * If the osp_thandle is remote thandle (th_top == NULL, only used
694  * by LFSCK), then it will create a local thandle, and stop it in
695  * osp_trans_stop(). And this only happens on OSP for OST.
696  *
697  * These are temporary solution, once OSP accessing OSD object is
698  * being fixed properly, this function should be removed. XXX
699  *
700  * \param[in] env               pointer to the thread context
701  * \param[in] th                pointer to the transaction handler
702  * \param[in] dt                pointer to the OSP device
703  *
704  * \retval                      pointer to the local thandle
705  * \retval                      ERR_PTR(errno) if it fails.
706  **/
707 struct thandle *osp_get_storage_thandle(const struct lu_env *env,
708                                         struct thandle *th,
709                                         struct osp_device *osp)
710 {
711         struct osp_thandle      *oth;
712         struct thandle          *local_th;
713
714         if (th->th_top != NULL)
715                 return thandle_get_sub_by_dt(env, th->th_top,
716                                              osp->opd_storage);
717
718         LASSERT(!osp->opd_connect_mdt);
719         oth = thandle_to_osp_thandle(th);
720         if (oth->ot_storage_th != NULL)
721                 return oth->ot_storage_th;
722
723         local_th = dt_trans_create(env, osp->opd_storage);
724         if (IS_ERR(local_th))
725                 return local_th;
726
727         oth->ot_storage_th = local_th;
728
729         return local_th;
730 }
731
732 /**
733  * The OSP layer dt_device_operations::dt_trans_start() interface
734  * to start the transaction.
735  *
736  * If the transaction is a remote transaction, then related remote
737  * updates will be triggered in the osp_trans_stop().
738  * Please refer to osp_trans_create() for transaction type.
739  *
740  * \param[in] env               pointer to the thread context
741  * \param[in] dt                pointer to the OSP dt_device
742  * \param[in] th                pointer to the transaction handler
743  *
744  * \retval                      0 for success
745  * \retval                      negative error number on failure
746  */
747 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
748                     struct thandle *th)
749 {
750         struct osp_thandle      *oth = thandle_to_osp_thandle(th);
751
752         /* For remote thandle, if there are local thandle, start it here*/
753         if (is_only_remote_trans(th) && oth->ot_storage_th != NULL)
754                 return dt_trans_start(env, oth->ot_storage_th->th_dev,
755                                       oth->ot_storage_th);
756         return 0;
757 }
758
759 /**
760  * The OSP layer dt_device_operations::dt_trans_stop() interface
761  * to stop the transaction.
762  *
763  * If the transaction is a remote transaction, related remote
764  * updates will be triggered here via osp_trans_trigger().
765  *
766  * For synchronous mode update or any failed update, the request
767  * will be destroyed explicitly when the osp_trans_stop().
768  *
769  * Please refer to osp_trans_create() for transaction type.
770  *
771  * \param[in] env               pointer to the thread context
772  * \param[in] dt                pointer to the OSP dt_device
773  * \param[in] th                pointer to the transaction handler
774  *
775  * \retval                      0 for success
776  * \retval                      negative error number on failure
777  */
778 int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
779                    struct thandle *th)
780 {
781         struct osp_thandle       *oth = thandle_to_osp_thandle(th);
782         struct dt_update_request *dt_update;
783         int                      rc = 0;
784         int                      sent = 0;
785         ENTRY;
786
787         /* For remote transaction, if there is local storage thandle,
788          * stop it first */
789         if (oth->ot_storage_th != NULL && th->th_top == NULL) {
790                 dt_trans_stop(env, oth->ot_storage_th->th_dev,
791                               oth->ot_storage_th);
792                 oth->ot_storage_th = NULL;
793         }
794
795         dt_update = oth->ot_dur;
796         if (dt_update == NULL)
797                 GOTO(out, rc);
798
799         LASSERT(dt_update != LP_POISON);
800
801         /* If there are no updates, destroy dt_update and thandle */
802         if (dt_update->dur_buf.ub_req == NULL ||
803             dt_update->dur_buf.ub_req->ourq_count == 0) {
804                 dt_update_request_destroy(dt_update);
805                 GOTO(out, rc);
806         }
807
808         if (is_only_remote_trans(th) && !th->th_sync) {
809                 struct osp_device *osp = dt2osp_dev(th->th_dev);
810                 struct client_obd *cli = &osp->opd_obd->u.cli;
811
812                 rc = obd_get_request_slot(cli);
813                 if (rc != 0)
814                         GOTO(out, rc);
815
816                 if (!osp->opd_imp_active || !osp->opd_imp_connected) {
817                         obd_put_request_slot(cli);
818                         GOTO(out, rc = -ENOTCONN);
819                 }
820
821                 rc = osp_trans_trigger(env, dt2osp_dev(dt),
822                                        dt_update, th, &sent);
823                 if (rc != 0)
824                         obd_put_request_slot(cli);
825         } else {
826                 rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update,
827                                        th, &sent);
828         }
829
830 out:
831         /* If RPC is triggered successfully, dt_update will be freed in
832          * osp_update_interpreter() */
833         if (rc != 0 && dt_update != NULL && sent == 0) {
834                 struct osp_update_callback *ouc;
835                 struct osp_update_callback *next;
836
837                 list_for_each_entry_safe(ouc, next, &dt_update->dur_cb_items,
838                                  ouc_list) {
839                         list_del_init(&ouc->ouc_list);
840                         if (ouc->ouc_interpreter != NULL)
841                                 ouc->ouc_interpreter(env, NULL, NULL,
842                                                      ouc->ouc_obj,
843                                                      ouc->ouc_data, 0, rc);
844                         osp_update_callback_fini(env, ouc);
845                 }
846
847                 dt_update_request_destroy(dt_update);
848         }
849
850         OBD_FREE_PTR(oth);
851
852         RETURN(rc);
853 }