Whamcloud - gitweb
LU-7931 tests: setup/cleanup after every test script
[fs/lustre-release.git] / lustre / osp / osp_trans.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, 2015, Intel Corporation.
24  */
25 /*
26  * lustre/osp/osp_trans.c
27  *
28  *
29  * 1. OSP (Object Storage Proxy) transaction methods
30  *
31  * Implement OSP layer transaction related interfaces for the dt_device API
32  * dt_device_operations.
33  *
34  *
35  * 2. Handle asynchronous idempotent operations
36  *
37  * The OSP uses OUT (Object Unified Target) RPC to talk with other server
38  * (MDT or OST) for kinds of operations, such as create, unlink, insert,
39  * delete, lookup, set_(x)attr, get_(x)attr, and etc. To reduce the number
40  * of RPCs, we allow multiple operations to be packaged together in single
41  * OUT RPC.
42  *
43  * For the asynchronous idempotent operations, such as get_(x)attr, related
44  * RPCs will be inserted into an osp_device based shared asynchronous request
45  * queue - osp_device::opd_async_requests. When the queue is full, all the
46  * requests in the queue will be packaged into a single OUT RPC and given to
47  * the ptlrpcd daemon (for sending), then the queue is purged and other new
48  * requests can be inserted into it.
49  *
50  * When the asynchronous idempotent operation inserts the request into the
51  * shared queue, it will register an interpreter. When the packaged OUT RPC
52  * is replied (or failed to be sent out), all the registered interpreters
53  * will be called one by one to handle each own result.
54  *
55  *
56  * There are three kinds of transactions
57  *
58  * 1. Local transaction, all of updates of the transaction are in the local MDT.
59  * 2. Remote transaction, all of updates of the transaction are in one remote
60  * MDT, which only happens in LFSCK now.
61  * 3. Distribute transaction, updates for the transaction are in mulitple MDTs.
62  *
63  * Author: Di Wang <di.wang@intel.com>
64  * Author: Fan, Yong <fan.yong@intel.com>
65  */
66
67 #define DEBUG_SUBSYSTEM S_MDS
68
69 #include <lustre_net.h>
70 #include "osp_internal.h"
71
72 /**
73  * The argument for the interpreter callback of osp request.
74  */
75 struct osp_update_args {
76         struct osp_update_request *oaua_update;
77         atomic_t                 *oaua_count;
78         wait_queue_head_t        *oaua_waitq;
79         bool                      oaua_flow_control;
80         const struct lu_env      *oaua_update_env;
81 };
82
83 /**
84  * Call back for each update request.
85  */
86 struct osp_update_callback {
87         /* list in the osp_update_request::our_cb_items */
88         struct list_head                 ouc_list;
89
90         /* The target of the async update request. */
91         struct osp_object               *ouc_obj;
92
93         /* The data used by or_interpreter. */
94         void                            *ouc_data;
95
96         /* The interpreter function called after the async request handled. */
97         osp_update_interpreter_t        ouc_interpreter;
98 };
99
100 static struct object_update_request *object_update_request_alloc(size_t size)
101 {
102         struct object_update_request *ourq;
103
104         OBD_ALLOC_LARGE(ourq, size);
105         if (ourq == NULL)
106                 return ERR_PTR(-ENOMEM);
107
108         ourq->ourq_magic = UPDATE_REQUEST_MAGIC;
109         ourq->ourq_count = 0;
110
111         return ourq;
112 }
113
114 /**
115  * Allocate new update request
116  *
117  * Allocate new update request and insert it to the req_update_list.
118  *
119  * \param [in] our      osp_udate_request where to create a new
120  *                      update request
121  *
122  * \retval      0 if creation succeeds.
123  * \retval      negative errno if creation fails.
124  */
125 int osp_object_update_request_create(struct osp_update_request *our,
126                                      size_t size)
127 {
128         struct osp_update_request_sub *ours;
129
130         OBD_ALLOC_PTR(ours);
131         if (ours == NULL)
132                 return -ENOMEM;
133
134         if (size < OUT_UPDATE_INIT_BUFFER_SIZE)
135                 size = OUT_UPDATE_INIT_BUFFER_SIZE;
136
137         ours->ours_req = object_update_request_alloc(size);
138
139         if (IS_ERR(ours->ours_req)) {
140                 OBD_FREE_PTR(ours);
141                 return -ENOMEM;
142         }
143
144         ours->ours_req_size = size;
145         INIT_LIST_HEAD(&ours->ours_list);
146         list_add_tail(&ours->ours_list, &our->our_req_list);
147         our->our_req_nr++;
148
149         return 0;
150 }
151
152 /**
153  * Get current update request
154  *
155  * Get current object update request from our_req_list in
156  * osp_update_request, because we always insert the new update
157  * request in the last position, so the last update request
158  * in the list will be the current update req.
159  *
160  * \param[in] our       osp update request where to get the
161  *                      current object update.
162  *
163  * \retval              the current object update.
164  **/
165 struct osp_update_request_sub *
166 osp_current_object_update_request(struct osp_update_request *our)
167 {
168         if (list_empty(&our->our_req_list))
169                 return NULL;
170
171         return list_entry(our->our_req_list.prev, struct osp_update_request_sub,
172                           ours_list);
173 }
174
175 /**
176  * Allocate and initialize osp_update_request
177  *
178  * osp_update_request is being used to track updates being executed on
179  * this dt_device(OSD or OSP). The update buffer will be 4k initially,
180  * and increased if needed.
181  *
182  * \param [in] dt       dt device
183  *
184  * \retval              osp_update_request being allocated if succeed
185  * \retval              ERR_PTR(errno) if failed
186  */
187 struct osp_update_request *osp_update_request_create(struct dt_device *dt)
188 {
189         struct osp_update_request *our;
190
191         OBD_ALLOC_PTR(our);
192         if (our == NULL)
193                 return ERR_PTR(-ENOMEM);
194
195         INIT_LIST_HEAD(&our->our_req_list);
196         INIT_LIST_HEAD(&our->our_cb_items);
197         INIT_LIST_HEAD(&our->our_list);
198         spin_lock_init(&our->our_list_lock);
199
200         osp_object_update_request_create(our, OUT_UPDATE_INIT_BUFFER_SIZE);
201         return our;
202 }
203
204 void osp_update_request_destroy(struct osp_update_request *our)
205 {
206         struct osp_update_request_sub *ours;
207         struct osp_update_request_sub *tmp;
208
209         if (our == NULL)
210                 return;
211
212         list_for_each_entry_safe(ours, tmp, &our->our_req_list, ours_list) {
213                 list_del(&ours->ours_list);
214                 if (ours->ours_req != NULL)
215                         OBD_FREE_LARGE(ours->ours_req, ours->ours_req_size);
216                 OBD_FREE_PTR(ours);
217         }
218         OBD_FREE_PTR(our);
219 }
220
221 static void
222 object_update_request_dump(const struct object_update_request *ourq,
223                            unsigned int mask)
224 {
225         unsigned int i;
226         size_t total_size = 0;
227
228         for (i = 0; i < ourq->ourq_count; i++) {
229                 struct object_update    *update;
230                 size_t                  size = 0;
231
232                 update = object_update_request_get(ourq, i, &size);
233                 LASSERT(update != NULL);
234                 CDEBUG(mask, "i = %u fid = "DFID" op = %s "
235                        "params = %d batchid = "LPU64" size = %zu repsize %u\n",
236                        i, PFID(&update->ou_fid),
237                        update_op_str(update->ou_type),
238                        update->ou_params_count,
239                        update->ou_batchid, size,
240                        (unsigned)update->ou_result_size);
241
242                 total_size += size;
243         }
244
245         CDEBUG(mask, "updates = %p magic = %x count = %d size = %zu\n", ourq,
246                ourq->ourq_magic, ourq->ourq_count, total_size);
247 }
248
249 /**
250  * Prepare inline update request
251  *
252  * Prepare OUT update ptlrpc inline request, and the request usually includes
253  * one update buffer, which does not need bulk transfer.
254  *
255  * \param[in] env       execution environment
256  * \param[in] req       ptlrpc request
257  * \param[in] ours      sub osp_update_request to be packed
258  *
259  * \retval              0 if packing succeeds
260  * \retval              negative errno if packing fails
261  */
262 int osp_prep_inline_update_req(const struct lu_env *env,
263                                struct ptlrpc_request *req,
264                                struct osp_update_request *our,
265                                int repsize)
266 {
267         struct osp_update_request_sub *ours;
268         struct out_update_header *ouh;
269         __u32 update_req_size;
270         int rc;
271
272         ours = list_entry(our->our_req_list.next,
273                           struct osp_update_request_sub, ours_list);
274         update_req_size = object_update_request_size(ours->ours_req);
275         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_HEADER, RCL_CLIENT,
276                              update_req_size + sizeof(*ouh));
277
278         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
279         if (rc != 0)
280                 RETURN(rc);
281
282         ouh = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_HEADER);
283         ouh->ouh_magic = OUT_UPDATE_HEADER_MAGIC;
284         ouh->ouh_count = 1;
285         ouh->ouh_inline_length = update_req_size;
286         ouh->ouh_reply_size = repsize;
287
288         memcpy(ouh->ouh_inline_data, ours->ours_req, update_req_size);
289
290         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
291                              RCL_SERVER, repsize);
292
293         ptlrpc_request_set_replen(req);
294         req->rq_request_portal = OUT_PORTAL;
295         req->rq_reply_portal = OSC_REPLY_PORTAL;
296
297         RETURN(rc);
298 }
299
300 /**
301  * Prepare update request.
302  *
303  * Prepare OUT update ptlrpc request, and the request usually includes
304  * all of updates (stored in \param ureq) from one operation.
305  *
306  * \param[in] env       execution environment
307  * \param[in] imp       import on which ptlrpc request will be sent
308  * \param[in] ureq      hold all of updates which will be packed into the req
309  * \param[in] reqp      request to be created
310  *
311  * \retval              0 if preparation succeeds.
312  * \retval              negative errno if preparation fails.
313  */
314 int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
315                         struct osp_update_request *our,
316                         struct ptlrpc_request **reqp)
317 {
318         struct ptlrpc_request           *req;
319         struct ptlrpc_bulk_desc         *desc;
320         struct osp_update_request_sub   *ours;
321         const struct object_update_request *ourq;
322         struct out_update_header        *ouh;
323         struct out_update_buffer        *oub;
324         __u32                           buf_count = 0;
325         int                             repsize = 0;
326         struct object_update_reply      *reply;
327         int                             rc, i;
328         int                             total = 0;
329         ENTRY;
330
331         list_for_each_entry(ours, &our->our_req_list, ours_list) {
332                 object_update_request_dump(ours->ours_req, D_INFO);
333
334                 ourq = ours->ours_req;
335                 for (i = 0; i < ourq->ourq_count; i++) {
336                         struct object_update    *update;
337                         size_t                  size = 0;
338
339
340                         /* XXX: it's very inefficient to lookup update
341                          *      this way, iterating from the beginning
342                          *      each time */
343                         update = object_update_request_get(ourq, i, &size);
344                         LASSERT(update != NULL);
345
346                         repsize += sizeof(reply->ourp_lens[0]);
347                         repsize += sizeof(struct object_update_result);
348                         repsize += update->ou_result_size;
349                 }
350
351                 buf_count++;
352         }
353         repsize += sizeof(*reply);
354         repsize = (repsize + OUT_UPDATE_REPLY_SIZE - 1) &
355                         ~(OUT_UPDATE_REPLY_SIZE - 1);
356         LASSERT(buf_count > 0);
357
358         req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
359         if (req == NULL)
360                 RETURN(-ENOMEM);
361
362         if (buf_count == 1) {
363                 ours = list_entry(our->our_req_list.next,
364                                   struct osp_update_request_sub, ours_list);
365
366                 /* Let's check if it can be packed inline */
367                 if (object_update_request_size(ours->ours_req) +
368                     sizeof(struct out_update_header) <
369                                 OUT_UPDATE_MAX_INLINE_SIZE) {
370                         rc = osp_prep_inline_update_req(env, req, our, repsize);
371                         if (rc == 0)
372                                 *reqp = req;
373                         GOTO(out_req, rc);
374                 }
375         }
376
377         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_HEADER, RCL_CLIENT,
378                              sizeof(struct osp_update_request));
379
380         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_BUF, RCL_CLIENT,
381                              buf_count * sizeof(*oub));
382
383         rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
384         if (rc != 0)
385                 GOTO(out_req, rc);
386
387         ouh = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_HEADER);
388         ouh->ouh_magic = OUT_UPDATE_HEADER_MAGIC;
389         ouh->ouh_count = buf_count;
390         ouh->ouh_inline_length = 0;
391         ouh->ouh_reply_size = repsize;
392         oub = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_BUF);
393         list_for_each_entry(ours, &our->our_req_list, ours_list) {
394                 oub->oub_size = ours->ours_req_size;
395                 oub++;
396         }
397
398         req->rq_bulk_write = 1;
399         desc = ptlrpc_prep_bulk_imp(req, buf_count,
400                 MD_MAX_BRW_SIZE >> LNET_MTU_BITS,
401                 PTLRPC_BULK_GET_SOURCE | PTLRPC_BULK_BUF_KVEC,
402                 MDS_BULK_PORTAL, &ptlrpc_bulk_kvec_ops);
403         if (desc == NULL)
404                 GOTO(out_req, rc = -ENOMEM);
405
406         /* NB req now owns desc and will free it when it gets freed */
407         list_for_each_entry(ours, &our->our_req_list, ours_list) {
408                 desc->bd_frag_ops->add_iov_frag(desc, ours->ours_req,
409                                                 ours->ours_req_size);
410                 total += ours->ours_req_size;
411         }
412         CDEBUG(D_OTHER, "total %d in %u\n", total, our->our_update_nr);
413
414         req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
415                              RCL_SERVER, repsize);
416
417         ptlrpc_request_set_replen(req);
418         req->rq_request_portal = OUT_PORTAL;
419         req->rq_reply_portal = OSC_REPLY_PORTAL;
420         *reqp = req;
421
422 out_req:
423         if (rc < 0)
424                 ptlrpc_req_finished(req);
425
426         RETURN(rc);
427 }
428
429 /**
430  * Send update RPC.
431  *
432  * Send update request to the remote MDT synchronously.
433  *
434  * \param[in] env       execution environment
435  * \param[in] imp       import on which ptlrpc request will be sent
436  * \param[in] our       hold all of updates which will be packed into the req
437  * \param[in] reqp      request to be created
438  *
439  * \retval              0 if RPC succeeds.
440  * \retval              negative errno if RPC fails.
441  */
442 int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
443                     struct osp_update_request *our,
444                     struct ptlrpc_request **reqp)
445 {
446         struct obd_import       *imp = osp->opd_obd->u.cli.cl_import;
447         struct ptlrpc_request   *req = NULL;
448         int                     rc;
449         ENTRY;
450
451         rc = osp_prep_update_req(env, imp, our, &req);
452         if (rc != 0)
453                 RETURN(rc);
454
455         /* This will only be called with read-only update, and these updates
456          * might be used to retrieve update log during recovery process, so
457          * it will be allowed to send during recovery process */
458         req->rq_allow_replay = 1;
459         req->rq_allow_intr = 1;
460
461         /* Note: some dt index api might return non-zero result here, like
462          * osd_index_ea_lookup, so we should only check rc < 0 here */
463         rc = ptlrpc_queue_wait(req);
464         our->our_rc = rc;
465         if (rc < 0 || reqp == NULL)
466                 ptlrpc_req_finished(req);
467         else
468                 *reqp = req;
469
470         RETURN(rc);
471 }
472
473 /**
474  * Invalidate all objects in the osp thandle
475  *
476  * invalidate all of objects in the update request, which will be called
477  * when the transaction is aborted.
478  *
479  * \param[in] oth       osp thandle.
480  */
481 static void osp_thandle_invalidate_object(const struct lu_env *env,
482                                           struct osp_thandle *oth)
483 {
484         struct osp_update_request *our = oth->ot_our;
485         struct osp_update_request_sub *ours;
486
487         if (our == NULL)
488                 return;
489
490         list_for_each_entry(ours, &our->our_req_list, ours_list) {
491                 struct object_update_request *our_req = ours->ours_req;
492                 unsigned int i;
493                 struct lu_object *obj;
494                 struct osp_object *osp_obj;
495
496                 for (i = 0; i < our_req->ourq_count; i++) {
497                         struct object_update *update;
498
499                         update = object_update_request_get(our_req, i, NULL);
500                         if (update == NULL)
501                                 break;
502
503                         if (update->ou_type != OUT_WRITE)
504                                 continue;
505
506                         if (!fid_is_sane(&update->ou_fid))
507                                 continue;
508
509                         obj = lu_object_find_slice(env,
510                                         &oth->ot_super.th_dev->dd_lu_dev,
511                                         &update->ou_fid, NULL);
512                         if (IS_ERR(obj))
513                                 break;
514
515                         osp_obj = lu2osp_obj(obj);
516                         if (osp_obj->opo_ooa != NULL) {
517                                 spin_lock(&osp_obj->opo_lock);
518                                 osp_obj->opo_ooa->ooa_attr.la_valid = 0;
519                                 osp_obj->opo_stale = 1;
520                                 spin_unlock(&osp_obj->opo_lock);
521                         }
522                         lu_object_put(env, obj);
523                 }
524         }
525 }
526
527 static void osp_trans_stop_cb(const struct lu_env *env,
528                               struct osp_thandle *oth, int result)
529 {
530         struct dt_txn_commit_cb *dcb;
531         struct dt_txn_commit_cb *tmp;
532
533         /* call per-transaction stop callbacks if any */
534         list_for_each_entry_safe(dcb, tmp, &oth->ot_stop_dcb_list,
535                                  dcb_linkage) {
536                 LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
537                          "commit callback entry: magic=%x name='%s'\n",
538                          dcb->dcb_magic, dcb->dcb_name);
539                 list_del_init(&dcb->dcb_linkage);
540                 dcb->dcb_func(NULL, &oth->ot_super, dcb, result);
541         }
542
543         if (result < 0)
544                 osp_thandle_invalidate_object(env, oth);
545 }
546
547 /**
548  * Allocate an osp request and initialize it with the given parameters.
549  *
550  * \param[in] obj               pointer to the operation target
551  * \param[in] data              pointer to the data used by the interpreter
552  * \param[in] interpreter       pointer to the interpreter function
553  *
554  * \retval                      pointer to the asychronous request
555  * \retval                      NULL if the allocation failed
556  */
557 static struct osp_update_callback *
558 osp_update_callback_init(struct osp_object *obj, void *data,
559                          osp_update_interpreter_t interpreter)
560 {
561         struct osp_update_callback *ouc;
562
563         OBD_ALLOC_PTR(ouc);
564         if (ouc == NULL)
565                 return NULL;
566
567         lu_object_get(osp2lu_obj(obj));
568         INIT_LIST_HEAD(&ouc->ouc_list);
569         ouc->ouc_obj = obj;
570         ouc->ouc_data = data;
571         ouc->ouc_interpreter = interpreter;
572
573         return ouc;
574 }
575
576 /**
577  * Destroy the osp_update_callback.
578  *
579  * \param[in] env       pointer to the thread context
580  * \param[in] ouc       pointer to osp_update_callback
581  */
582 static void osp_update_callback_fini(const struct lu_env *env,
583                                      struct osp_update_callback *ouc)
584 {
585         LASSERT(list_empty(&ouc->ouc_list));
586
587         lu_object_put(env, osp2lu_obj(ouc->ouc_obj));
588         OBD_FREE_PTR(ouc);
589 }
590
591 /**
592  * Interpret the packaged OUT RPC results.
593  *
594  * For every packaged sub-request, call its registered interpreter function.
595  * Then destroy the sub-request.
596  *
597  * \param[in] env       pointer to the thread context
598  * \param[in] req       pointer to the RPC
599  * \param[in] arg       pointer to data used by the interpreter
600  * \param[in] rc        the RPC return value
601  *
602  * \retval              0 for success
603  * \retval              negative error number on failure
604  */
605 static int osp_update_interpret(const struct lu_env *env,
606                                 struct ptlrpc_request *req, void *arg, int rc)
607 {
608         struct object_update_reply      *reply  = NULL;
609         struct osp_update_args          *oaua   = arg;
610         struct osp_update_request       *our = oaua->oaua_update;
611         struct osp_thandle              *oth;
612         struct osp_update_callback      *ouc;
613         struct osp_update_callback      *next;
614         int                              count  = 0;
615         int                              index  = 0;
616         int                              rc1    = 0;
617
618         ENTRY;
619
620         if (our == NULL)
621                 RETURN(0);
622
623         /* Sigh env might be NULL in some cases, see
624          * this calling path.
625          * osp_send_update_thread()
626          *  ptlrpc_set_wait() ----> null env.
627          *   ptlrpc_check_set()
628          *    osp_update_interpret()
629          * Let's use env in oaua for this case.
630          */
631         if (env == NULL)
632                 env = oaua->oaua_update_env;
633
634         oaua->oaua_update = NULL;
635         oth = our->our_th;
636         if (oaua->oaua_flow_control) {
637                 struct osp_device *osp;
638
639                 LASSERT(oth != NULL);
640                 osp = dt2osp_dev(oth->ot_super.th_dev);
641                 obd_put_request_slot(&osp->opd_obd->u.cli);
642         }
643
644         /* Unpack the results from the reply message. */
645         if (req->rq_repmsg != NULL && req->rq_replied) {
646                 reply = req_capsule_server_sized_get(&req->rq_pill,
647                                                      &RMF_OUT_UPDATE_REPLY,
648                                                      OUT_UPDATE_REPLY_SIZE);
649                 if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC) {
650                         if (rc == 0)
651                                 rc = -EPROTO;
652                 } else {
653                         count = reply->ourp_count;
654                 }
655         }
656
657         list_for_each_entry_safe(ouc, next, &our->our_cb_items, ouc_list) {
658                 list_del_init(&ouc->ouc_list);
659
660                 /* The peer may only have handled some requests (indicated
661                  * by the 'count') in the packaged OUT RPC, we can only get
662                  * results for the handled part. */
663                 if (index < count && reply->ourp_lens[index] > 0 && rc >= 0) {
664                         struct object_update_result *result;
665
666                         result = object_update_result_get(reply, index, NULL);
667                         if (result == NULL)
668                                 rc1 = rc = -EPROTO;
669                         else
670                                 rc1 = rc = result->our_rc;
671                 } else if (rc1 >= 0) {
672                         /* The peer did not handle these request, let's return
673                          * -EINVAL to update interpret for now */
674                         if (rc >= 0)
675                                 rc1 = -EINVAL;
676                         else
677                                 rc1 = rc;
678                 }
679
680                 if (ouc->ouc_interpreter != NULL)
681                         ouc->ouc_interpreter(env, reply, req, ouc->ouc_obj,
682                                              ouc->ouc_data, index, rc1);
683
684                 osp_update_callback_fini(env, ouc);
685                 index++;
686         }
687
688         if (oaua->oaua_count != NULL && atomic_dec_and_test(oaua->oaua_count))
689                 wake_up_all(oaua->oaua_waitq);
690
691         if (oth != NULL) {
692                 /* oth and osp_update_requests will be destoryed in
693                  * osp_thandle_put */
694                 osp_trans_stop_cb(env, oth, rc);
695                 osp_thandle_put(oth);
696         } else {
697                 osp_update_request_destroy(our);
698         }
699
700         RETURN(rc);
701 }
702
703 /**
704  * Pack all the requests in the shared asynchronous idempotent request queue
705  * into a single OUT RPC that will be given to the background ptlrpcd daemon.
706  *
707  * \param[in] env       pointer to the thread context
708  * \param[in] osp       pointer to the OSP device
709  * \param[in] our       pointer to the shared queue
710  *
711  * \retval              0 for success
712  * \retval              negative error number on failure
713  */
714 int osp_unplug_async_request(const struct lu_env *env,
715                              struct osp_device *osp,
716                              struct osp_update_request *our)
717 {
718         struct osp_update_args  *args;
719         struct ptlrpc_request   *req = NULL;
720         int                      rc;
721
722         rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
723                                  our, &req);
724         if (rc != 0) {
725                 struct osp_update_callback *ouc;
726                 struct osp_update_callback *next;
727
728                 list_for_each_entry_safe(ouc, next,
729                                          &our->our_cb_items, ouc_list) {
730                         list_del_init(&ouc->ouc_list);
731                         if (ouc->ouc_interpreter != NULL)
732                                 ouc->ouc_interpreter(env, NULL, NULL,
733                                                      ouc->ouc_obj,
734                                                      ouc->ouc_data, 0, rc);
735                         osp_update_callback_fini(env, ouc);
736                 }
737                 osp_update_request_destroy(our);
738         } else {
739                 args = ptlrpc_req_async_args(req);
740                 args->oaua_update = our;
741                 args->oaua_count = NULL;
742                 args->oaua_waitq = NULL;
743                 /* Note: this is asynchronous call for the request, so the
744                  * interrupte cb and current function will be different
745                  * thread, so we need use different env */
746                 args->oaua_update_env = NULL;
747                 args->oaua_flow_control = false;
748                 req->rq_interpret_reply = osp_update_interpret;
749                 ptlrpcd_add_req(req);
750         }
751
752         return rc;
753 }
754
755 /**
756  * Find or create (if NOT exist or purged) the shared asynchronous idempotent
757  * request queue - osp_device::opd_async_requests.
758  *
759  * If the osp_device::opd_async_requests is not NULL, then return it directly;
760  * otherwise create new osp_update_request and attach it to opd_async_requests.
761  *
762  * \param[in] osp       pointer to the OSP device
763  *
764  * \retval              pointer to the shared queue
765  * \retval              negative error number on failure
766  */
767 static struct osp_update_request *
768 osp_find_or_create_async_update_request(struct osp_device *osp)
769 {
770         struct osp_update_request *our = osp->opd_async_requests;
771
772         if (our != NULL)
773                 return our;
774
775         our = osp_update_request_create(&osp->opd_dt_dev);
776         if (IS_ERR(our))
777                 return our;
778
779         osp->opd_async_requests = our;
780
781         return our;
782 }
783
784 /**
785  * Insert an osp_update_callback into the osp_update_request.
786  *
787  * Insert an osp_update_callback to the osp_update_request. Usually each update
788  * in the osp_update_request will have one correspondent callback, and these
789  * callbacks will be called in rq_interpret_reply.
790  *
791  * \param[in] env               pointer to the thread context
792  * \param[in] obj               pointer to the operation target object
793  * \param[in] data              pointer to the data used by the interpreter
794  * \param[in] interpreter       pointer to the interpreter function
795  *
796  * \retval                      0 for success
797  * \retval                      negative error number on failure
798  */
799 int osp_insert_update_callback(const struct lu_env *env,
800                                struct osp_update_request *our,
801                                struct osp_object *obj, void *data,
802                                osp_update_interpreter_t interpreter)
803 {
804         struct osp_update_callback  *ouc;
805
806         ouc = osp_update_callback_init(obj, data, interpreter);
807         if (ouc == NULL)
808                 RETURN(-ENOMEM);
809
810         list_add_tail(&ouc->ouc_list, &our->our_cb_items);
811
812         return 0;
813 }
814
815 /**
816  * Insert an asynchronous idempotent request to the shared request queue that
817  * is attached to the osp_device.
818  *
819  * This function generates a new osp_async_request with the given parameters,
820  * then tries to insert the request into the osp_device-based shared request
821  * queue. If the queue is full, then triggers the packaged OUT RPC to purge
822  * the shared queue firstly, and then re-tries.
823  *
824  * NOTE: must hold the osp::opd_async_requests_mutex to serialize concurrent
825  *       osp_insert_async_request call from others.
826  *
827  * \param[in] env               pointer to the thread context
828  * \param[in] op                operation type, see 'enum update_type'
829  * \param[in] obj               pointer to the operation target
830  * \param[in] count             array size of the subsequent \a lens and \a bufs
831  * \param[in] lens              buffer length array for the subsequent \a bufs
832  * \param[in] bufs              the buffers to compose the request
833  * \param[in] data              pointer to the data used by the interpreter
834  * \param[in] repsize           how many bytes the caller allocated for \a data
835  * \param[in] interpreter       pointer to the interpreter function
836  *
837  * \retval                      0 for success
838  * \retval                      negative error number on failure
839  */
840 int osp_insert_async_request(const struct lu_env *env, enum update_type op,
841                              struct osp_object *obj, int count,
842                              __u16 *lens, const void **bufs,
843                              void *data, __u32 repsize,
844                              osp_update_interpreter_t interpreter)
845 {
846         struct osp_device               *osp;
847         struct osp_update_request       *our;
848         struct object_update            *object_update;
849         size_t                          max_update_size;
850         struct object_update_request    *ureq;
851         struct osp_update_request_sub   *ours;
852         int                             rc = 0;
853         ENTRY;
854
855         osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev);
856         our = osp_find_or_create_async_update_request(osp);
857         if (IS_ERR(our))
858                 RETURN(PTR_ERR(our));
859
860 again:
861         ours = osp_current_object_update_request(our);
862
863         ureq = ours->ours_req;
864         max_update_size = ours->ours_req_size -
865                           object_update_request_size(ureq);
866
867         object_update = update_buffer_get_update(ureq, ureq->ourq_count);
868         rc = out_update_pack(env, object_update, &max_update_size, op,
869                              lu_object_fid(osp2lu_obj(obj)), count, lens, bufs,
870                              repsize);
871         /* The queue is full. */
872         if (rc == -E2BIG) {
873                 osp->opd_async_requests = NULL;
874                 mutex_unlock(&osp->opd_async_requests_mutex);
875
876                 rc = osp_unplug_async_request(env, osp, our);
877                 mutex_lock(&osp->opd_async_requests_mutex);
878                 if (rc != 0)
879                         RETURN(rc);
880
881                 our = osp_find_or_create_async_update_request(osp);
882                 if (IS_ERR(our))
883                         RETURN(PTR_ERR(our));
884
885                 goto again;
886         } else {
887                 if (rc < 0)
888                         RETURN(rc);
889
890                 ureq->ourq_count++;
891                 our->our_update_nr++;
892         }
893
894         rc = osp_insert_update_callback(env, our, obj, data, interpreter);
895
896         RETURN(rc);
897 }
898
899 int osp_trans_update_request_create(struct thandle *th)
900 {
901         struct osp_thandle              *oth = thandle_to_osp_thandle(th);
902         struct osp_update_request       *our;
903
904         if (oth->ot_our != NULL)
905                 return 0;
906
907         our = osp_update_request_create(th->th_dev);
908         if (IS_ERR(our)) {
909                 th->th_result = PTR_ERR(our);
910                 return PTR_ERR(our);
911         }
912
913         oth->ot_our = our;
914         our->our_th = oth;
915
916         return 0;
917 }
918
919 void osp_thandle_destroy(struct osp_thandle *oth)
920 {
921         LASSERT(oth->ot_magic == OSP_THANDLE_MAGIC);
922         LASSERT(list_empty(&oth->ot_commit_dcb_list));
923         LASSERT(list_empty(&oth->ot_stop_dcb_list));
924         if (oth->ot_our != NULL)
925                 osp_update_request_destroy(oth->ot_our);
926         OBD_FREE_PTR(oth);
927 }
928
929 /**
930  * The OSP layer dt_device_operations::dt_trans_create() interface
931  * to create a transaction.
932  *
933  * There are two kinds of transactions that will involve OSP:
934  *
935  * 1) If the transaction only contains the updates on remote server
936  *    (MDT or OST), such as re-generating the lost OST-object for
937  *    LFSCK, then it is a remote transaction. For remote transaction,
938  *    the upper layer caller (such as the LFSCK engine) will call the
939  *    dt_trans_create() (with the OSP dt_device as the parameter),
940  *    then the call will be directed to the osp_trans_create() that
941  *    creates the transaction handler and returns it to the caller.
942  *
943  * 2) If the transcation contains both local and remote updates,
944  *    such as cross MDTs create under DNE mode, then the upper layer
945  *    caller will not trigger osp_trans_create(). Instead, it will
946  *    call dt_trans_create() on other dt_device, such as LOD that
947  *    will generate the transaction handler. Such handler will be
948  *    used by the whole transaction in subsequent sub-operations.
949  *
950  * \param[in] env       pointer to the thread context
951  * \param[in] d         pointer to the OSP dt_device
952  *
953  * \retval              pointer to the transaction handler
954  * \retval              negative error number on failure
955  */
956 struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
957 {
958         struct osp_thandle              *oth;
959         struct thandle                  *th = NULL;
960         ENTRY;
961
962         OBD_ALLOC_PTR(oth);
963         if (unlikely(oth == NULL))
964                 RETURN(ERR_PTR(-ENOMEM));
965
966         oth->ot_magic = OSP_THANDLE_MAGIC;
967         th = &oth->ot_super;
968         th->th_dev = d;
969         th->th_tags = LCT_TX_HANDLE;
970
971         atomic_set(&oth->ot_refcount, 1);
972         INIT_LIST_HEAD(&oth->ot_commit_dcb_list);
973         INIT_LIST_HEAD(&oth->ot_stop_dcb_list);
974
975         RETURN(th);
976 }
977
978 /**
979  * Add commit callback to transaction.
980  *
981  * Add commit callback to the osp thandle, which will be called
982  * when the thandle is committed remotely.
983  *
984  * \param[in] th        the thandle
985  * \param[in] dcb       commit callback structure
986  *
987  * \retval              only return 0 for now.
988  */
989 int osp_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
990 {
991         struct osp_thandle *oth = thandle_to_osp_thandle(th);
992
993         LASSERT(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC);
994         LASSERT(&dcb->dcb_func != NULL);
995         if (dcb->dcb_flags & DCB_TRANS_STOP)
996                 list_add(&dcb->dcb_linkage, &oth->ot_stop_dcb_list);
997         else
998                 list_add(&dcb->dcb_linkage, &oth->ot_commit_dcb_list);
999         return 0;
1000 }
1001
1002 static void osp_trans_commit_cb(struct osp_thandle *oth, int result)
1003 {
1004         struct dt_txn_commit_cb *dcb;
1005         struct dt_txn_commit_cb *tmp;
1006
1007         LASSERT(atomic_read(&oth->ot_refcount) > 0);
1008         /* call per-transaction callbacks if any */
1009         list_for_each_entry_safe(dcb, tmp, &oth->ot_commit_dcb_list,
1010                                  dcb_linkage) {
1011                 LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
1012                          "commit callback entry: magic=%x name='%s'\n",
1013                          dcb->dcb_magic, dcb->dcb_name);
1014                 list_del_init(&dcb->dcb_linkage);
1015                 dcb->dcb_func(NULL, &oth->ot_super, dcb, result);
1016         }
1017 }
1018
1019 static void osp_request_commit_cb(struct ptlrpc_request *req)
1020 {
1021         struct thandle          *th = req->rq_cb_data;
1022         struct osp_thandle      *oth;
1023         __u64                   last_committed_transno = 0;
1024         int                     result = req->rq_status;
1025         ENTRY;
1026
1027         if (th == NULL)
1028                 RETURN_EXIT;
1029
1030         oth = thandle_to_osp_thandle(th);
1031         if (req->rq_repmsg != NULL &&
1032             lustre_msg_get_last_committed(req->rq_repmsg))
1033                 last_committed_transno =
1034                         lustre_msg_get_last_committed(req->rq_repmsg);
1035
1036         if (last_committed_transno <
1037                 req->rq_import->imp_peer_committed_transno)
1038                 last_committed_transno =
1039                         req->rq_import->imp_peer_committed_transno;
1040
1041         CDEBUG(D_HA, "trans no "LPU64" committed transno "LPU64"\n",
1042                req->rq_transno, last_committed_transno);
1043
1044         /* If the transaction is not really committed, mark result = 1 */
1045         if (req->rq_transno != 0 &&
1046             (req->rq_transno > last_committed_transno) && result == 0)
1047                 result = 1;
1048
1049         osp_trans_commit_cb(oth, result);
1050         req->rq_committed = 1;
1051         osp_thandle_put(oth);
1052         EXIT;
1053 }
1054
1055 /**
1056  * callback of osp transaction
1057  *
1058  * Call all of callbacks for this osp thandle. This will only be
1059  * called in error handler path. In the normal processing path,
1060  * these callback will be called in osp_request_commit_cb() and
1061  * osp_update_interpret().
1062  *
1063  * \param [in] env      execution environment
1064  * \param [in] oth      osp thandle
1065  * \param [in] rc       result of the osp thandle
1066  */
1067 void osp_trans_callback(const struct lu_env *env,
1068                         struct osp_thandle *oth, int rc)
1069 {
1070         struct osp_update_callback *ouc;
1071         struct osp_update_callback *next;
1072
1073         if (oth->ot_our != NULL) {
1074                 list_for_each_entry_safe(ouc, next,
1075                                          &oth->ot_our->our_cb_items, ouc_list) {
1076                         list_del_init(&ouc->ouc_list);
1077                         if (ouc->ouc_interpreter != NULL)
1078                                 ouc->ouc_interpreter(env, NULL, NULL,
1079                                                      ouc->ouc_obj,
1080                                                      ouc->ouc_data, 0, rc);
1081                         osp_update_callback_fini(env, ouc);
1082                 }
1083         }
1084         osp_trans_stop_cb(env, oth, rc);
1085         osp_trans_commit_cb(oth, rc);
1086 }
1087
1088 /**
1089  * Send the request for remote updates.
1090  *
1091  * Send updates to the remote MDT. Prepare the request by osp_update_req
1092  * and send them to remote MDT, for sync request, it will wait
1093  * until the reply return, otherwise hand it to ptlrpcd.
1094  *
1095  * Please refer to osp_trans_create() for transaction type.
1096  *
1097  * \param[in] env               pointer to the thread context
1098  * \param[in] osp               pointer to the OSP device
1099  * \param[in] our               pointer to the osp_update_request
1100  *
1101  * \retval                      0 for success
1102  * \retval                      negative error number on failure
1103  */
1104 static int osp_send_update_req(const struct lu_env *env,
1105                                struct osp_device *osp,
1106                                struct osp_update_request *our)
1107 {
1108         struct osp_update_args  *args;
1109         struct ptlrpc_request   *req;
1110         struct lu_device *top_device;
1111         struct osp_thandle      *oth = our->our_th;
1112         int     rc = 0;
1113         ENTRY;
1114
1115         LASSERT(oth != NULL);
1116         rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
1117                                  our, &req);
1118         if (rc != 0) {
1119                 osp_trans_callback(env, oth, rc);
1120                 RETURN(rc);
1121         }
1122
1123         args = ptlrpc_req_async_args(req);
1124         args->oaua_update = our;
1125         /* set env to NULL, in case the interrupt cb and current function
1126          * are in different thread */
1127         args->oaua_update_env = NULL;
1128         osp_thandle_get(oth); /* hold for update interpret */
1129         req->rq_interpret_reply = osp_update_interpret;
1130         if (!oth->ot_super.th_wait_submit && !oth->ot_super.th_sync) {
1131                 if (!osp->opd_imp_active || !osp->opd_imp_connected) {
1132                         osp_trans_callback(env, oth, rc);
1133                         osp_thandle_put(oth);
1134                         GOTO(out, rc = -ENOTCONN);
1135                 }
1136
1137                 rc = obd_get_request_slot(&osp->opd_obd->u.cli);
1138                 if (rc != 0) {
1139                         osp_trans_callback(env, oth, rc);
1140                         osp_thandle_put(oth);
1141                         GOTO(out, rc = -ENOTCONN);
1142                 }
1143                 args->oaua_flow_control = true;
1144
1145                 if (!osp->opd_connect_mdt) {
1146                         down_read(&osp->opd_async_updates_rwsem);
1147                         args->oaua_count = &osp->opd_async_updates_count;
1148                         args->oaua_waitq = &osp->opd_syn_barrier_waitq;
1149                         up_read(&osp->opd_async_updates_rwsem);
1150                         atomic_inc(args->oaua_count);
1151                 }
1152
1153                 ptlrpcd_add_req(req);
1154                 req = NULL;
1155         } else {
1156                 osp_thandle_get(oth); /* hold for commit callback */
1157                 req->rq_commit_cb = osp_request_commit_cb;
1158                 req->rq_cb_data = &oth->ot_super;
1159                 args->oaua_flow_control = false;
1160
1161                 /* If the transaction is created during MDT recoverying
1162                  * process, it means this is an recovery update, we need
1163                  * to let OSP send it anyway without checking recoverying
1164                  * status, in case the other target is being recoveried
1165                  * at the same time, and if we wait here for the import
1166                  * to be recoveryed, it might cause deadlock */
1167                 top_device = osp->opd_dt_dev.dd_lu_dev.ld_site->ls_top_dev;
1168                 if (top_device->ld_obd->obd_recovering)
1169                         req->rq_allow_replay = 1;
1170
1171                 /* Because this req will be synchronus, i.e. it will be called
1172                  * in the same thread, so it will be safe to use current
1173                  * env */
1174                 args->oaua_update_env = env;
1175                 if (osp->opd_connect_mdt)
1176                         osp_get_rpc_lock(osp);
1177                 rc = ptlrpc_queue_wait(req);
1178                 if (osp->opd_connect_mdt)
1179                         osp_put_rpc_lock(osp);
1180                 if ((rc == -ENOMEM && req->rq_set == NULL) ||
1181                     (req->rq_transno == 0 && !req->rq_committed)) {
1182                         if (args->oaua_update != NULL) {
1183                                 /* If osp_update_interpret is not being called,
1184                                  * release the osp_thandle */
1185                                 args->oaua_update = NULL;
1186                                 osp_thandle_put(oth);
1187                         }
1188
1189                         req->rq_cb_data = NULL;
1190                         rc = rc == 0 ? req->rq_status : rc;
1191                         osp_trans_callback(env, oth, rc);
1192                         osp_thandle_put(oth);
1193                         GOTO(out, rc);
1194                 }
1195         }
1196 out:
1197         if (req != NULL)
1198                 ptlrpc_req_finished(req);
1199
1200         RETURN(rc);
1201 }
1202
1203 /**
1204  * Get local thandle for osp_thandle
1205  *
1206  * Get the local OSD thandle from the OSP thandle. Currently, there
1207  * are a few OSP API (osp_object_create() and osp_sync_add()) needs
1208  * to update the object on local OSD device.
1209  *
1210  * If the osp_thandle comes from normal stack (MDD->LOD->OSP), then
1211  * we will get local thandle by thandle_get_sub_by_dt.
1212  *
1213  * If the osp_thandle is remote thandle (th_top == NULL, only used
1214  * by LFSCK), then it will create a local thandle, and stop it in
1215  * osp_trans_stop(). And this only happens on OSP for OST.
1216  *
1217  * These are temporary solution, once OSP accessing OSD object is
1218  * being fixed properly, this function should be removed. XXX
1219  *
1220  * \param[in] env               pointer to the thread context
1221  * \param[in] th                pointer to the transaction handler
1222  * \param[in] dt                pointer to the OSP device
1223  *
1224  * \retval                      pointer to the local thandle
1225  * \retval                      ERR_PTR(errno) if it fails.
1226  **/
1227 struct thandle *osp_get_storage_thandle(const struct lu_env *env,
1228                                         struct thandle *th,
1229                                         struct osp_device *osp)
1230 {
1231         struct osp_thandle      *oth;
1232         struct thandle          *local_th;
1233
1234         if (th->th_top != NULL)
1235                 return thandle_get_sub_by_dt(env, th->th_top,
1236                                              osp->opd_storage);
1237
1238         LASSERT(!osp->opd_connect_mdt);
1239         oth = thandle_to_osp_thandle(th);
1240         if (oth->ot_storage_th != NULL)
1241                 return oth->ot_storage_th;
1242
1243         local_th = dt_trans_create(env, osp->opd_storage);
1244         if (IS_ERR(local_th))
1245                 return local_th;
1246
1247         oth->ot_storage_th = local_th;
1248
1249         return local_th;
1250 }
1251
1252 /**
1253  * Set version for the transaction
1254  *
1255  * Set the version for the transaction and add the request to
1256  * the sending list, then after transaction stop, the request
1257  * will be picked in the order of version, by sending thread.
1258  *
1259  * \param [in] oth      osp thandle to be set version.
1260  *
1261  * \retval              0 if set version succeeds
1262  *                      negative errno if set version fails.
1263  */
1264 int osp_check_and_set_rpc_version(struct osp_thandle *oth,
1265                                   struct osp_object *obj)
1266 {
1267         struct osp_device *osp = dt2osp_dev(oth->ot_super.th_dev);
1268         struct osp_updates *ou = osp->opd_update;
1269
1270         if (ou == NULL)
1271                 return -EIO;
1272
1273         if (oth->ot_our->our_version != 0)
1274                 return 0;
1275
1276         spin_lock(&ou->ou_lock);
1277         spin_lock(&oth->ot_our->our_list_lock);
1278         if (obj->opo_stale) {
1279                 spin_unlock(&oth->ot_our->our_list_lock);
1280                 spin_unlock(&ou->ou_lock);
1281                 return -ESTALE;
1282         }
1283
1284         /* Assign the version and add it to the sending list */
1285         osp_thandle_get(oth);
1286         oth->ot_our->our_version = ou->ou_version++;
1287         list_add_tail(&oth->ot_our->our_list,
1288                       &osp->opd_update->ou_list);
1289         oth->ot_our->our_req_ready = 0;
1290         spin_unlock(&oth->ot_our->our_list_lock);
1291         spin_unlock(&ou->ou_lock);
1292
1293         LASSERT(oth->ot_super.th_wait_submit == 1);
1294         CDEBUG(D_INFO, "%s: version "LPU64" oth:version %p:"LPU64"\n",
1295                osp->opd_obd->obd_name, ou->ou_version, oth,
1296                oth->ot_our->our_version);
1297
1298         return 0;
1299 }
1300
1301 /**
1302  * Get next OSP update request in the sending list
1303  * Get next OSP update request in the sending list by version number, next
1304  * request will be
1305  * 1. transaction which does not have a version number.
1306  * 2. transaction whose version == opd_rpc_version.
1307  *
1308  * \param [in] ou       osp update structure.
1309  * \param [out] ourp    the pointer holding the next update request.
1310  *
1311  * \retval              true if getting the next transaction.
1312  * \retval              false if not getting the next transaction.
1313  */
1314 static bool
1315 osp_get_next_request(struct osp_updates *ou, struct osp_update_request **ourp)
1316 {
1317         struct osp_update_request *our;
1318         struct osp_update_request *tmp;
1319         bool                    got_req = false;
1320
1321         spin_lock(&ou->ou_lock);
1322         list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) {
1323                 LASSERT(our->our_th != NULL);
1324                 CDEBUG(D_HA, "ou %p version "LPU64" rpc_version "LPU64"\n",
1325                        ou, our->our_version, ou->ou_rpc_version);
1326                 spin_lock(&our->our_list_lock);
1327                 /* Find next osp_update_request in the list */
1328                 if (our->our_version == ou->ou_rpc_version &&
1329                     our->our_req_ready) {
1330                         list_del_init(&our->our_list);
1331                         spin_unlock(&our->our_list_lock);
1332                         *ourp = our;
1333                         got_req = true;
1334                         break;
1335                 }
1336                 spin_unlock(&our->our_list_lock);
1337         }
1338         spin_unlock(&ou->ou_lock);
1339
1340         return got_req;
1341 }
1342
1343 /**
1344  * Invalidate update request
1345  *
1346  * Invalidate update request in the OSP sending list, so all of
1347  * requests in the sending list will return error, which happens
1348  * when it finds one update (with writing llog) requests fails or
1349  * the OSP is evicted by remote target. see osp_send_update_thread().
1350  *
1351  * \param[in] osp       OSP device whose update requests will be
1352  *                      invalidated.
1353  **/
1354 void osp_invalidate_request(struct osp_device *osp)
1355 {
1356         struct lu_env env;
1357         struct osp_updates *ou = osp->opd_update;
1358         struct osp_update_request *our;
1359         struct osp_update_request *tmp;
1360         LIST_HEAD(list);
1361         int                     rc;
1362         ENTRY;
1363
1364         if (ou == NULL)
1365                 return;
1366
1367         rc = lu_env_init(&env, osp->opd_dt_dev.dd_lu_dev.ld_type->ldt_ctx_tags);
1368         if (rc < 0) {
1369                 CERROR("%s: init env error: rc = %d\n", osp->opd_obd->obd_name,
1370                        rc);
1371                 return;
1372         }
1373
1374         INIT_LIST_HEAD(&list);
1375
1376         spin_lock(&ou->ou_lock);
1377         /* invalidate all of request in the sending list */
1378         list_for_each_entry_safe(our, tmp, &ou->ou_list, our_list) {
1379                 spin_lock(&our->our_list_lock);
1380                 if (our->our_req_ready)
1381                         list_move(&our->our_list, &list);
1382                 else
1383                         list_del_init(&our->our_list);
1384
1385                 if (our->our_th->ot_super.th_result == 0)
1386                         our->our_th->ot_super.th_result = -EIO;
1387
1388                 if (our->our_version >= ou->ou_rpc_version)
1389                         ou->ou_rpc_version = our->our_version + 1;
1390                 spin_unlock(&our->our_list_lock);
1391
1392                 CDEBUG(D_HA, "%s invalidate our %p\n", osp->opd_obd->obd_name,
1393                        our);
1394         }
1395
1396         spin_unlock(&ou->ou_lock);
1397
1398         /* invalidate all of request in the sending list */
1399         list_for_each_entry_safe(our, tmp, &list, our_list) {
1400                 spin_lock(&our->our_list_lock);
1401                 list_del_init(&our->our_list);
1402                 spin_unlock(&our->our_list_lock);
1403                 osp_trans_callback(&env, our->our_th,
1404                                    our->our_th->ot_super.th_result);
1405                 osp_thandle_put(our->our_th);
1406         }
1407         lu_env_fini(&env);
1408 }
1409
1410 /**
1411  * Sending update thread
1412  *
1413  * Create thread to send update request to other MDTs, this thread will pull
1414  * out update request from the list in OSP by version number, i.e. it will
1415  * make sure the update request with lower version number will be sent first.
1416  *
1417  * \param[in] arg       hold the OSP device.
1418  *
1419  * \retval              0 if the thread is created successfully.
1420  * \retal               negative error if the thread is not created
1421  *                      successfully.
1422  */
1423 int osp_send_update_thread(void *arg)
1424 {
1425         struct lu_env           env;
1426         struct osp_device       *osp = arg;
1427         struct l_wait_info       lwi = { 0 };
1428         struct osp_updates      *ou = osp->opd_update;
1429         struct ptlrpc_thread    *thread = &osp->opd_update_thread;
1430         struct osp_update_request *our = NULL;
1431         int                     rc;
1432         ENTRY;
1433
1434         LASSERT(ou != NULL);
1435         rc = lu_env_init(&env, osp->opd_dt_dev.dd_lu_dev.ld_type->ldt_ctx_tags);
1436         if (rc < 0) {
1437                 CERROR("%s: init env error: rc = %d\n", osp->opd_obd->obd_name,
1438                        rc);
1439                 RETURN(rc);
1440         }
1441
1442         thread->t_flags = SVC_RUNNING;
1443         wake_up(&thread->t_ctl_waitq);
1444         while (1) {
1445                 our = NULL;
1446                 l_wait_event(ou->ou_waitq,
1447                              !osp_send_update_thread_running(osp) ||
1448                              osp_get_next_request(ou, &our), &lwi);
1449
1450                 if (!osp_send_update_thread_running(osp)) {
1451                         if (our != NULL) {
1452                                 osp_trans_callback(&env, our->our_th, -EINTR);
1453                                 osp_thandle_put(our->our_th);
1454                         }
1455                         break;
1456                 }
1457
1458                 LASSERT(our->our_th != NULL);
1459                 if (our->our_th->ot_super.th_result != 0) {
1460                         osp_trans_callback(&env, our->our_th,
1461                                 our->our_th->ot_super.th_result);
1462                         rc = our->our_th->ot_super.th_result;
1463                 } else if (OBD_FAIL_CHECK(OBD_FAIL_INVALIDATE_UPDATE)) {
1464                         rc = -EIO;
1465                         osp_trans_callback(&env, our->our_th, rc);
1466                 } else {
1467                         rc = osp_send_update_req(&env, osp, our);
1468                 }
1469
1470                 /* Update the rpc version */
1471                 spin_lock(&ou->ou_lock);
1472                 if (our->our_version == ou->ou_rpc_version)
1473                         ou->ou_rpc_version++;
1474                 spin_unlock(&ou->ou_lock);
1475
1476                 /* If one update request fails, let's fail all of the requests
1477                  * in the sending list, because the request in the sending
1478                  * list are dependent on either other, continue sending these
1479                  * request might cause llog or filesystem corruption */
1480                 if (rc < 0)
1481                         osp_invalidate_request(osp);
1482
1483                 /* Balanced for thandle_get in osp_check_and_set_rpc_version */
1484                 osp_thandle_put(our->our_th);
1485         }
1486
1487         thread->t_flags = SVC_STOPPED;
1488         lu_env_fini(&env);
1489         wake_up(&thread->t_ctl_waitq);
1490
1491         RETURN(0);
1492 }
1493
1494 /**
1495  * The OSP layer dt_device_operations::dt_trans_start() interface
1496  * to start the transaction.
1497  *
1498  * If the transaction is a remote transaction, then related remote
1499  * updates will be triggered in the osp_trans_stop().
1500  * Please refer to osp_trans_create() for transaction type.
1501  *
1502  * \param[in] env               pointer to the thread context
1503  * \param[in] dt                pointer to the OSP dt_device
1504  * \param[in] th                pointer to the transaction handler
1505  *
1506  * \retval                      0 for success
1507  * \retval                      negative error number on failure
1508  */
1509 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
1510                     struct thandle *th)
1511 {
1512         struct osp_thandle      *oth = thandle_to_osp_thandle(th);
1513
1514         if (oth->ot_super.th_sync)
1515                 oth->ot_our->our_flags |= UPDATE_FL_SYNC;
1516         /* For remote thandle, if there are local thandle, start it here*/
1517         if (is_only_remote_trans(th) && oth->ot_storage_th != NULL)
1518                 return dt_trans_start(env, oth->ot_storage_th->th_dev,
1519                                       oth->ot_storage_th);
1520         return 0;
1521 }
1522
1523 /**
1524  * The OSP layer dt_device_operations::dt_trans_stop() interface
1525  * to stop the transaction.
1526  *
1527  * If the transaction is a remote transaction, related remote
1528  * updates will be triggered at the end of this function.
1529  *
1530  * For synchronous mode update or any failed update, the request
1531  * will be destroyed explicitly when the osp_trans_stop().
1532  *
1533  * Please refer to osp_trans_create() for transaction type.
1534  *
1535  * \param[in] env               pointer to the thread context
1536  * \param[in] dt                pointer to the OSP dt_device
1537  * \param[in] th                pointer to the transaction handler
1538  *
1539  * \retval                      0 for success
1540  * \retval                      negative error number on failure
1541  */
1542 int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
1543                    struct thandle *th)
1544 {
1545         struct osp_thandle       *oth = thandle_to_osp_thandle(th);
1546         struct osp_update_request *our = oth->ot_our;
1547         struct osp_device        *osp = dt2osp_dev(dt);
1548         int                      rc = 0;
1549         ENTRY;
1550
1551         /* For remote transaction, if there is local storage thandle,
1552          * stop it first */
1553         if (oth->ot_storage_th != NULL && th->th_top == NULL) {
1554                 dt_trans_stop(env, oth->ot_storage_th->th_dev,
1555                               oth->ot_storage_th);
1556                 oth->ot_storage_th = NULL;
1557         }
1558
1559         if (our == NULL || list_empty(&our->our_req_list)) {
1560                 osp_trans_callback(env, oth, th->th_result);
1561                 GOTO(out, rc = th->th_result);
1562         }
1563
1564         if (!osp->opd_connect_mdt) {
1565                 osp_trans_callback(env, oth, th->th_result);
1566                 rc = osp_send_update_req(env, osp, oth->ot_our);
1567                 GOTO(out, rc);
1568         }
1569
1570         if (osp->opd_update == NULL ||
1571             !osp_send_update_thread_running(osp)) {
1572                 osp_trans_callback(env, oth, -EIO);
1573                 GOTO(out, rc = -EIO);
1574         }
1575
1576         CDEBUG(D_HA, "%s: add oth %p with version "LPU64"\n",
1577                osp->opd_obd->obd_name, oth, our->our_version);
1578
1579         LASSERT(our->our_req_ready == 0);
1580         spin_lock(&our->our_list_lock);
1581         if (likely(!list_empty(&our->our_list))) {
1582                 /* notify sending thread */
1583                 our->our_req_ready = 1;
1584                 wake_up(&osp->opd_update->ou_waitq);
1585                 spin_unlock(&our->our_list_lock);
1586         } else if (th->th_result == 0) {
1587                 /* if the request does not needs to be serialized,
1588                  * read-only request etc, let's send it right away */
1589                 spin_unlock(&our->our_list_lock);
1590                 rc = osp_send_update_req(env, osp, our);
1591         } else {
1592                 spin_unlock(&our->our_list_lock);
1593                 osp_trans_callback(env, oth, th->th_result);
1594         }
1595 out:
1596         osp_thandle_put(oth);
1597
1598         RETURN(rc);
1599 }