/***************************************************************************** * GIOP Promela Model * * * vim:tabstop=4 noexpandtab: * $Id: giop3.pr,v 1.11 1999/10/04 21:15:18 m2kamel Exp m2kamel $ * * TODO: * * Copyright (C) 1998 by Moataz Kamel. All rights reserved. *****************************************************************************/ #define CHANLEN 5 /* default channel length (5) */ #define MAXREQID 4 /* maximum number for request_id (4) */ #define NUMOBJS 2 /* number of objects (2) */ #define NUMPORTS 3 /* number of ports = no. of transport procs */ #define MAXMIGRATIONS 2 /* maximum number of times server can migrate */ #define FALSE 0 #define TRUE 1 #define FREE 0 /* indicates a resource is unused */ #define INUSE 1 /* indicates a resource is being used */ #define CANCELLED 2 /* indicates a request has been cancelled */ #define INVALID 255 /* set invalid data as garbage (i.e 255) */ typedef ObjRef { byte objKey; byte port; } /* Define the table of published IORs for all ORBS * This is essentially the Name Server, it should be * managed by a dedicated process but for now, it is * shared. * The table is indexed by objKey and contains the port * of the ORB where the object was most recently registered. */ byte gPublished[NUMOBJS] = INVALID; /* Note: this inits all array values */ /* Message types */ mtype = { /* GIOP message types */ Request, Reply, CancelRequest, CloseConnection, /* User message types */ URequest, UReply, /* Server message types */ SRegister, SRequest, SReply, SMigrateReq, }; /* The GIOP message header structure */ /*typedef GIOPHeader { * byte message_type; * byte message_size; *} */ /* The MsgHeader structure aggregates all the fields from all types of GIOP message headers */ typedef MsgHeader { byte request_id; /* Request, Reply, CancelRequest, */ /* LocateRequest, LocateReply */ byte object_key; /* Request, LocateRequest, */ /* bit response_expected; /* Request */ /* byte operation; /* Request */ byte reply_status; /* Reply */ byte forward_port; /* Reply -- LOCATION_FORWARD address */ /* byte locate_status; /* LocateReply */ byte tag; } /* Most GIOP messages consist of three parts: 1) the GIOP header (contains the message type), 2) the specific message header associated with the message type, and 3) the message body (contains the actual data of the message). (the third part is unneccessary for the validation model) */ typedef GIOPMsg { /* GIOPHeader hdr;*/ MsgHeader mhdr; } /* reply_status return codes */ #define NO_EXCEPTION 1 #define USER_EXCEPTION 2 #define SYSTEM_EXCEPTION 3 #define LOCATION_FORWARD 4 #define OBJECT_NOT_EXIST 5 /* locate_status return codes */ #define UNKNOWN_OBJECT 1 #define OBJECT_HERE 2 #define OBJECT_FORWARD 3 #define AGENT_CLOSED 0 #define AGENT_CONNECTED 1 /* Global variables for LTL verification */ byte user6Processed = 0; byte user7Processed = 0; bit srequested[MAXREQID] = FALSE; byte srequest_reqId = INVALID; byte sreply_reqId = INVALID; byte request_reqId = INVALID; byte reply_reqId = INVALID; byte numRequestsRcvd[NUMPORTS] = 0; byte numRepliesSent[NUMPORTS] = 0; #define NUMPROCESSES 10 byte mypid[NUMPROCESSES] = 0; /*--------------------------------------------------------------------------*/ chan toUser = [CHANLEN] of {mtype, byte /*tag*/, byte /*status*/}; proctype User(chan lin, lout) { byte tag = _pid; /* use our pid as a tag value */ byte status; ObjRef objref; byte i; /* Block until all objRefs have been published */ atomic{ i = 0; do :: ( i < NUMOBJS ) -> gPublished[i] != INVALID; /* blocks until it's true */ i = i + 1; :: ( i == NUMOBJS ) -> break; od; i = 0; } /* Get a valid object reference from the published table and * use it to make a request. Note: the objref is chosen randomly * from among the published objRefs. */ atomic{ i = 0; do :: ( i <= NUMOBJS-1 ) -> objref.objKey = i; objref.port = gPublished[i]; /* choose this one */ break; :: ( i < NUMOBJS-1 ) -> i = i + 1; od; i = 0; } /* send a user request on the object */ lout!URequest(tag,objref); URequestSent: /* Wait for a reply and only receive it if it has the same tag * (i.e. originated from this instance of the user) */ lin?UReply(eval(tag), status); progress: UReplyReceived: if ::( status == NO_EXCEPTION ) -> NoException: printf("Request satisfied\n"); ::( status == USER_EXCEPTION ) -> UserException: printf("Request failed: user exception\n"); ::( status == SYSTEM_EXCEPTION ) -> SystemException: printf("Request failed: system exception\n"); ::( status == OBJECT_NOT_EXIST ) -> printf("Request failed: object does not exist\n"); fi } /*--------------------------------------------------------------------------*/ chan toClientL = [0] of {mtype, byte /*svrPort*/, GIOPMsg}; chan toClientU = [CHANLEN] of {mtype, byte /*tag*/, ObjRef}; /* The usedReqId is global so that it can be available for LTL formulas */ /* but, it is only changed by the GIOPClient, so should really be a local */ byte usedReqId[MAXREQID]; /*= FREE;*/ /* keeps track of request_ids in-use */ proctype GIOPClient(chan uin, uout, lin, lout) { byte tags[MAXREQID] = INVALID; /* associates tags to request_ids */ ObjRef objRefs[MAXREQID] = INVALID; /* saves objRefs in case of CloseConnection */ byte reqId; byte tag; ObjRef objref; GIOPMsg msg; byte svrPort; end: do :: uin?URequest(tag,objref) -> /* Received a URequest message from the user * tag identifies the User process that made the request * objref is the object reference containing the destination port */ d_step { svrPort = objref.port; /* find a free request_id by scanning the usedReqId array */ reqId = 0; } do :: (usedReqId[reqId] != FREE) -> reqId = (reqId + 1); assert(reqId < MAXREQID) /* all reqIds INUSE */ :: (usedReqId[reqId] == FREE) -> break; od; /* found a free request_id, mark it as INUSE */ usedReqId[reqId] = INUSE; d_step { /* save the tag and objref in case of CloseConnection */ tags[reqId] = tag; objRefs[reqId].objKey = objref.objKey; objRefs[reqId].port = objref.port; /* build and send the request message */ msg.mhdr.request_id = reqId; msg.mhdr.object_key = objref.objKey; msg.mhdr.tag = tag; request_reqId = reqId; /* for validation */ } /* send the request */ lout!Request(svrPort, msg); RequestSent: /* randomly choose to cancel the sent request or just continue */ if :: (1) -> /* do nothing */ :: (1) -> /* The Message contents are the same as the request, The important * thing is the request_id needs to be the same as the request */ lout!CancelRequest(svrPort, msg); CancelSent: usedReqId[reqId] = CANCELLED; /* send a reply to the user indicating an exception */ uout!UReply(tag, SYSTEM_EXCEPTION); fi; :: lin?Reply(svrPort, msg) -> /* It is an error to receive a reply for something that is not * either pending or cancelled. */ d_step { reqId = msg.mhdr.request_id; assert( usedReqId[reqId] != FREE ); reply_reqId = reqId; /* for validation */ } ReplyRecvd: if :: (usedReqId[reqId] == INUSE) -> /* normal case, request exists */ if :: ( msg.mhdr.reply_status == LOCATION_FORWARD ) -> /* re-send the request to the new forward address */ /* save the new port in case of Close */ d_step{ objRefs[reqId].port = msg.mhdr.forward_port; } lout!Request(objRefs[reqId].port,msg); :: else -> /* send the reply to the user */ uout!UReply(tags[reqId], msg.mhdr.reply_status); /* free the request_id */ d_step{ usedReqId[reqId] = FREE; tags[reqId] = INVALID; objRefs[reqId].port = INVALID; objRefs[reqId].objKey = INVALID; } fi; :: (usedReqId[reqId] == CANCELLED) -> /* request was previously cancelled, so just free the request_id */ d_step{ usedReqId[reqId] = FREE; tags[reqId] = INVALID; objRefs[reqId].port = INVALID; objRefs[reqId].objKey = INVALID; } fi; :: lin?CloseConnection(svrPort, msg) -> /* If a client receives a CloseConnection message from the server, it * should assume that any outstanding messages (i.e. without replies) * were received after the server sent the CloseConnection message, * were not processed, and may be safely resent on a new connection. * (12-31) */ /* find all outstanding requests for the given server */ reqId = 0; do :: (reqId == MAXREQID) -> break :: (reqId != MAXREQID) -> if :: (objRefs[reqId].port == svrPort) -> /* only consider outstanding requests to the given server */ if :: (usedReqId[reqId] == INUSE) -> /* This request has not received a reply. * Re-send the request on a new connection. * (the transport will have made a new connection) */ /* build and send the request */ d_step{ msg.mhdr.request_id = reqId; msg.mhdr.object_key = objRefs[reqId].objKey; msg.mhdr.tag = tags[reqId]; } lout!Request(objRefs[reqId].port,msg); :: (usedReqId[reqId] == CANCELLED) -> /* This request has not received a reply but * the user is not expecting one. Just free it. */ d_step{ usedReqId[reqId] = FREE; tags[reqId] = INVALID; objRefs[reqId].port = INVALID; objRefs[reqId].objKey = INVALID; } :: (usedReqId[reqId] == FREE) -> /* do nothing */ fi; :: (objRefs[reqId].port != svrPort) -> /* do nothing */ fi; reqId = reqId + 1 od od } /*--------------------------------------------------------------------------*/ /* Define the channel; all servers share the same channel */ chan toServer[NUMPORTS] = [CHANLEN] of {mtype, byte /*objkey*/, byte /*opaqueData*/, byte /*opaqueData2*/, byte /* tag */}; chan toAgentU[NUMPORTS] = [0] of {mtype, byte /*objKey*/, byte /*opaqueData*/, byte /*opaqueData2*/}; chan toAgentL[NUMPORTS] = [0] of {mtype, byte /*clPort*/, GIOPMsg}; proctype Server(byte port, objKey) { byte opaqueData; byte opaqueData2; byte tag; byte newport; byte numMigrations = 0; /* Each server process represents a single server object; * the object_key is a unique ID for each server object */ toAgentU[port]!SRegister(objKey,port,0); end: do :: toServer[port]?SRequest(eval(objKey),opaqueData,opaqueData2,tag) -> /* for validation */ if :: (tag == mypid[6]) -> user6Processed = user6Processed + 1; User6Processed1: skip; :: (tag == mypid[7]) -> user7Processed = user7Processed + 1; User7Processed1: skip; :: else -> fi; /* end validation */ /* send the reply */ toAgentU[port]!SReply(objKey,opaqueData,opaqueData2) :: (numMigrations < MAXMIGRATIONS) -> /* Initiate server migration. * The migration destination is hardcoded, thus if more agents are * added this code would have to be changed... * Perhaps there is a better way? */ numMigrations = numMigrations + 1; if :: (port == 1) -> newport = 2 :: (port == 2) -> newport = 1 :: else -> assert(0); fi; /* First Register with the new Agent. */ toAgentU[newport]!SRegister(objKey,port,0); /* Tell the old agent that we're moving and tell it our * new port so it can forward requests to us */ toAgentU[port]!SMigrateReq(objKey,newport,0); /* Agent has completed the MigrateRequest, so we can complete the * migration i.e change our port... * But first, we need to clear out any SRequests that arrived after * initiation but before completion of the migration. */ do :: (1) -> if :: toServer[port]??[SRequest,eval(objKey)] -> toServer[port]??SRequest(eval(objKey),opaqueData,opaqueData2,tag); /* for validation */ if :: (tag == mypid[6]) -> user6Processed = user6Processed + 1; User6Processed2: skip; :: (tag == mypid[7]) -> user7Processed = user7Processed + 1; User7Processed2: skip; :: else -> fi; /* end validation */ toAgentU[port]!SReply(objKey,opaqueData,opaqueData2) :: else -> break; fi od; /* migration complete */ port = newport; od } /*--------------------------------------------------------------------------*/ proctype GIOPAgent(byte port; chan uin, uout, lin, lout) { byte requested[MAXREQID] = FREE; /* keeps track of outstanding reqIds */ bit connState = AGENT_CLOSED; byte numOutstandingReqs = 0; byte registered[NUMOBJS] = INVALID; /* registered object keys */ byte reqId; byte objKey; GIOPMsg msg; byte clPort; end: do :: uin?SRegister(objKey,_,_) -> /* Publish our address (port) as the ORB to contact for this objKey */ d_step{ gPublished[objKey] = port; /* Save the server's objkey to be able to direct requests to it. * The registered array holds the current port at which the object * is registered since objects can migrate between agents. */ registered[objKey] = port; } :: uin?SMigrateReq(objKey,clPort,_) -> /* The Server is informing us that it is migrating to another agent. * Save the new port so that if we get a request for the object we can * send a LOCATION_FORWARD reply with the new port. */ registered[objKey] = clPort; :: lin?Request(clPort, msg) -> /* Got a request from a client: process it by sending an SRequest to the appropriate server or sending a LOCATION_FORWARD reply with the address of the appropriate server. */ d_step{ reqId = msg.mhdr.request_id; objKey = msg.mhdr.object_key; numRequestsRcvd[port] = numRequestsRcvd[port] + 1; } RequestRcvd: if :: ( registered[objKey] == port ) -> /* The server for the object is registered with this agent, so * send the request to the server. */ d_step{ connState = AGENT_CONNECTED; srequest_reqId = reqId; /* for validation */ } /* send server request */ uout!SRequest(objKey,reqId,clPort,msg.mhdr.tag); SRequestSent: d_step{ requested[reqId] = INUSE; numOutstandingReqs = numOutstandingReqs + 1; srequested[reqId] = TRUE; /* for validation */ } :: ( registered[objKey] != port ) -> if :: ( registered[objKey] == INVALID ) -> /* No such server is registered, send back an exception */ d_step { msg.mhdr.request_id = reqId; msg.mhdr.object_key = objKey; msg.mhdr.reply_status = OBJECT_NOT_EXIST; } :: else -> /* The server has migrated, so send back a LOCATION_FORWARD * reply code with the new address of the server. */ d_step{ msg.mhdr.request_id = reqId; msg.mhdr.object_key = objKey; msg.mhdr.reply_status = LOCATION_FORWARD; msg.mhdr.forward_port = registered[objKey]; } fi; /* send the Reply */ lout!Reply(clPort,msg); ReplySent1: numRepliesSent[port] = numRepliesSent[port] + 1; /* for validation */ fi; :: uin?SReply(objKey,reqId,clPort) -> /* Received server reply */ /* Note: Servers may reply to pending requests in any order. (12-31) */ /* It is an error to receive a SReply for a SRequest * that was not outstanding, this should never happen */ assert(requested[reqId] != FREE); sreply_reqId = reqId; /* for validation */ SReplyReceived: if :: (requested[reqId] == CANCELLED) -> /* the request was cancelled so just free the reqId and * don't bother sending the Reply to the Client */ d_step{ requested[reqId] = FREE; numOutstandingReqs = numOutstandingReqs - 1; srequested[reqId] = FALSE; /* for validation */ } :: (requested[reqId] == INUSE) -> d_step{ msg.mhdr.request_id = reqId; msg.mhdr.object_key = objKey; msg.mhdr.reply_status = NO_EXCEPTION; } /* send the Reply */ lout!Reply(clPort,msg); ReplySent2: d_step{ requested[reqId] = FREE; numOutstandingReqs = numOutstandingReqs - 1; srequested[reqId] = FALSE; /* for validation */ numRepliesSent[port] = numRepliesSent[port] + 1; /* for validation */ } fi; /* If this is the last outstanding request then * close the connection. */ if :: (numOutstandingReqs == 0) -> if :: (connState == AGENT_CONNECTED) -> /* send the close */ lout!CloseConnection(clPort,msg); CloseConnectionSent: d_step{ connState = AGENT_CLOSED; numRequestsRcvd[port] = 0; /* for validation */ numRepliesSent[port] = 0; /* for validation */ } :: (connState != AGENT_CONNECTED) -> /* do nothing */ fi :: (numOutstandingReqs != 0) -> /* do nothing */ fi; :: lin?CancelRequest(clPort, msg) -> reqId = msg.mhdr.request_id; if :: ( requested[reqId] == INUSE ) -> /* we already sent the request to the server, * so we mark it as cancelled and wait for the reply */ requested[reqId] = CANCELLED :: ( requested[reqId] == FREE ) -> /* too late, we already replied */ :: ( requested[reqId] == CANCELLED ) -> /* was previously cancelled so just ignore */ fi od } /*--------------------------------------------------------------------------*/ /* The transport process associates a connId to a message before it transmits * it. At the other end, the receiving transport process checks if the received * message belongs to the current connection, if not it just discards it. * ConnIds are updated and synchronized each time a CloseConnection message is * passed. * (Note: we must ensure that the CloseConnection message will not be * discarded) */ #define MAXCONNID 8 #define NEXTCONNID ((current_connId + 1) % MAXCONNID) /* Define the transport channels: Upper and Lower interfaces; * The channel array is indexed by port number */ chan toTransportU[NUMPORTS] = [0] of {mtype, byte /*port*/, GIOPMsg }; chan toTransportL[NUMPORTS] = [CHANLEN] of {mtype, byte /*connid*/, byte /*port*/, GIOPMsg }; proctype transport(byte port; chan uin, uout) { mtype msgType; chan savedmsg = [1] of {mtype, byte, byte, GIOPMsg}; GIOPMsg msg; byte srcport, dstport; byte rcvConnId; byte connId[NUMPORTS] = 0; /* keep track of the connId for each port */ end: do :: uin?msgType(dstport, msg) -> /* If we get a CloseConnection message from the upper interface, then * we simulate closing the connection by bumping up the connId. * If there are any 'old' messages in the channel they will be * discarded on reception. */ preempt: /* send it */ toTransportL[dstport]!msgType(connId[dstport], port, msg); if :: (msgType == CloseConnection) -> connId[dstport] = (connId[dstport] + 1) % MAXCONNID; /*NEXTCONNID;*/ :: (msgType != CloseConnection) -> skip fi; /* check if we got here because of a preemption, * if so, pop out the saved message and go back */ if :: savedmsg?[msgType,rcvConnId,srcport,msg] -> savedmsg?msgType(rcvConnId,srcport,msg); goto resume :: else -> skip fi :: toTransportL[port]?msgType(rcvConnId, srcport, msg) -> /* If we get a CloseConnection message from the lower interface, then * we simulate closing the connection by bumping up the connId. * If there are any 'old' messages in the channel they will be * discarded on reception. */ resume: if /* Ensure that the received connId matches the one that we have */ :: ( (rcvConnId == connId[srcport]) ) -> if :: uout!msgType(srcport, msg) /* forward the message */ :: timeout -> /* This clause gets enabled when deadlock occurs due to * a message arriving at the upper while we're processing a * message from the lower interface... deal with it by saving * the lower i/f message and processing the upper i/f message * first. Afterwards, we resume and process the lower i/f * message. */ savedmsg!msgType(rcvConnId,srcport,msg); uin?msgType(dstport, msg); goto preempt fi :: ( (rcvConnId != connId[srcport]) ) -> printf("Old message encountered -> discarded\n"); /* discard the message (i.e don't forward it) */ fi; if :: (msgType == CloseConnection) -> connId[srcport] = (connId[srcport] + 1) % MAXCONNID; /*NEXTCONNID;*/ :: (msgType != CloseConnection) -> /* do nothing */ fi od } /*--------------------------------------------------------------------------*/ init { /* create the processes from the bottom up */ atomic { mypid[0] = run transport(/*port*/ 0, toTransportU[0], toClientL ); mypid[1] = run transport(/*port*/ 1, toTransportU[1], toAgentL[1] ); mypid[2] = run transport(/*port*/ 2, toTransportU[2], toAgentL[2] ); mypid[3] = run GIOPClient( toClientU, toUser, toClientL, toTransportU[0]); mypid[4] = run GIOPAgent(/*port*/ 1, toAgentU[1], toServer[1], toAgentL[1], toTransportU[1] ); mypid[5] = run GIOPAgent(/*port*/ 2, toAgentU[2], toServer[2], toAgentL[2], toTransportU[2] ); mypid[6] = run User( toUser, toClientU ); mypid[7] = run User( toUser, toClientU ); mypid[8] = run Server(/*port*/ 1, /*objkey*/ 0 ); mypid[9] = run Server(/*port*/ 2, /*objkey*/ 1 ); } } /* #include "never/v3.never" */ /* #include "never/v4_long.never" */ /* #include "never/v4a.never" */ /* #include "never/v4b.never" */ /* #include "never/v5.never" */ /* #include "never/v6a.never" */ /* #include "never/v6b.never" */ /* #include "never/v7.never" */ /* #include "never/v8.never" */ /* #include "never/v9a.never" */ /* #include "never/v9b.never" */ /* #include "never/v10.never" */ /*=========================================================================*/