12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926392739283929393039313932393339343935393639373938393939403941394239433944394539463947394839493950395139523953395439553956395739583959396039613962396339643965396639673968396939703971397239733974397539763977397839793980398139823983398439853986398739883989399039913992399339943995399639973998399940004001400240034004400540064007400840094010401140124013401440154016401740184019402040214022402340244025402640274028402940304031403240334034403540364037403840394040404140424043404440454046404740484049405040514052405340544055405640574058405940604061406240634064406540664067406840694070407140724073407440754076407740784079408040814082408340844085408640874088408940904091409240934094409540964097409840994100410141024103410441054106410741084109411041114112411341144115411641174118411941204121412241234124412541264127412841294130413141324133413441354136413741384139414041414142414341444145414641474148414941504151415241534154415541564157415841594160416141624163416441654166416741684169417041714172417341744175417641774178417941804181418241834184418541864187418841894190419141924193419441954196419741984199420042014202420342044205420642074208420942104211421242134214421542164217421842194220422142224223422442254226422742284229423042314232423342344235423642374238423942404241424242434244424542464247424842494250425142524253425442554256425742584259426042614262426342644265426642674268426942704271427242734274427542764277427842794280428142824283428442854286428742884289429042914292429342944295429642974298429943004301430243034304430543064307430843094310431143124313431443154316431743184319432043214322432343244325432643274328432943304331433243334334433543364337433843394340434143424343434443454346434743484349435043514352435343544355435643574358435943604361436243634364436543664367436843694370437143724373437443754376437743784379438043814382438343844385438643874388438943904391439243934394439543964397439843994400440144024403440444054406440744084409441044114412441344144415441644174418441944204421442244234424442544264427442844294430443144324433443444354436443744384439444044414442444344444445444644474448444944504451445244534454445544564457445844594460446144624463446444654466446744684469447044714472447344744475447644774478447944804481448244834484448544864487448844894490449144924493449444954496449744984499450045014502450345044505450645074508450945104511451245134514451545164517451845194520452145224523452445254526452745284529453045314532453345344535453645374538453945404541454245434544454545464547454845494550455145524553455445554556455745584559456045614562456345644565456645674568456945704571457245734574457545764577457845794580458145824583458445854586458745884589459045914592459345944595459645974598459946004601460246034604460546064607460846094610461146124613461446154616461746184619462046214622462346244625462646274628462946304631463246334634463546364637463846394640464146424643464446454646464746484649465046514652465346544655465646574658465946604661466246634664466546664667466846694670467146724673467446754676467746784679468046814682468346844685468646874688468946904691469246934694469546964697469846994700470147024703470447054706470747084709 |
- /*
- * SRT - Secure, Reliable, Transport
- * Copyright (c) 2018 Haivision Systems Inc.
- *
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/.
- *
- */
- /*****************************************************************************
- Copyright (c) 2001 - 2011, The Board of Trustees of the University of Illinois.
- All rights reserved.
- Redistribution and use in source and binary forms, with or without
- modification, are permitted provided that the following conditions are
- met:
- * Redistributions of source code must retain the above
- copyright notice, this list of conditions and the
- following disclaimer.
- * Redistributions in binary form must reproduce the
- above copyright notice, this list of conditions
- and the following disclaimer in the documentation
- and/or other materials provided with the distribution.
- * Neither the name of the University of Illinois
- nor the names of its contributors may be used to
- endorse or promote products derived from this
- software without specific prior written permission.
- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
- IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
- THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
- CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
- EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
- PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
- PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
- LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
- NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
- SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *****************************************************************************/
- /*****************************************************************************
- written by
- Yunhong Gu, last updated 07/09/2011
- modified by
- Haivision Systems Inc.
- *****************************************************************************/
- #include "platform_sys.h"
- #include <exception>
- #include <stdexcept>
- #include <typeinfo>
- #include <iterator>
- #include <vector>
- #include <cstring>
- #include "utilities.h"
- #include "netinet_any.h"
- #include "api.h"
- #include "core.h"
- #include "epoll.h"
- #include "logging.h"
- #include "threadname.h"
- #include "srt.h"
- #include "udt.h"
- #ifdef _WIN32
- #include <win/wintime.h>
- #endif
- #ifdef _MSC_VER
- #pragma warning(error : 4530)
- #endif
- using namespace std;
- using namespace srt_logging;
- using namespace srt::sync;
- void srt::CUDTSocket::construct()
- {
- #if ENABLE_BONDING
- m_GroupOf = NULL;
- m_GroupMemberData = NULL;
- #endif
- setupMutex(m_AcceptLock, "Accept");
- setupCond(m_AcceptCond, "Accept");
- setupMutex(m_ControlLock, "Control");
- }
- srt::CUDTSocket::~CUDTSocket()
- {
- releaseMutex(m_AcceptLock);
- releaseCond(m_AcceptCond);
- releaseMutex(m_ControlLock);
- }
- SRT_SOCKSTATUS srt::CUDTSocket::getStatus()
- {
- // TTL in CRendezvousQueue::updateConnStatus() will set m_bConnecting to false.
- // Although m_Status is still SRTS_CONNECTING, the connection is in fact to be closed due to TTL expiry.
- // In this case m_bConnected is also false. Both checks are required to avoid hitting
- // a regular state transition from CONNECTING to CONNECTED.
- if (m_UDT.m_bBroken)
- return SRTS_BROKEN;
- // Connecting timed out
- if ((m_Status == SRTS_CONNECTING) && !m_UDT.m_bConnecting && !m_UDT.m_bConnected)
- return SRTS_BROKEN;
- return m_Status;
- }
- // [[using locked(m_GlobControlLock)]]
- void srt::CUDTSocket::breakSocket_LOCKED()
- {
- // This function is intended to be called from GC,
- // under a lock of m_GlobControlLock.
- m_UDT.m_bBroken = true;
- m_UDT.m_iBrokenCounter = 0;
- HLOGC(smlog.Debug, log << "@" << m_SocketID << " CLOSING AS SOCKET");
- m_UDT.closeInternal();
- setClosed();
- }
- void srt::CUDTSocket::setClosed()
- {
- m_Status = SRTS_CLOSED;
- // a socket will not be immediately removed when it is closed
- // in order to prevent other methods from accessing invalid address
- // a timer is started and the socket will be removed after approximately
- // 1 second
- m_tsClosureTimeStamp = steady_clock::now();
- }
- void srt::CUDTSocket::setBrokenClosed()
- {
- m_UDT.m_iBrokenCounter = 60;
- m_UDT.m_bBroken = true;
- setClosed();
- }
- bool srt::CUDTSocket::readReady()
- {
- // TODO: Use m_RcvBufferLock here (CUDT::isRcvReadReady())?
- if (m_UDT.m_bConnected && m_UDT.m_pRcvBuffer->isRcvDataReady())
- return true;
- if (m_UDT.m_bListening)
- return !m_QueuedSockets.empty();
- return broken();
- }
- bool srt::CUDTSocket::writeReady() const
- {
- return (m_UDT.m_bConnected && (m_UDT.m_pSndBuffer->getCurrBufSize() < m_UDT.m_config.iSndBufSize)) || broken();
- }
- bool srt::CUDTSocket::broken() const
- {
- return m_UDT.m_bBroken || !m_UDT.m_bConnected;
- }
- ////////////////////////////////////////////////////////////////////////////////
- srt::CUDTUnited::CUDTUnited()
- : m_Sockets()
- , m_GlobControlLock()
- , m_IDLock()
- , m_mMultiplexer()
- , m_MultiplexerLock()
- , m_pCache(NULL)
- , m_bClosing(false)
- , m_GCStopCond()
- , m_InitLock()
- , m_iInstanceCount(0)
- , m_bGCStatus(false)
- , m_ClosedSockets()
- {
- // Socket ID MUST start from a random value
- m_SocketIDGenerator = genRandomInt(1, MAX_SOCKET_VAL);
- m_SocketIDGenerator_init = m_SocketIDGenerator;
- // XXX An unlikely exception thrown from the below calls
- // might destroy the application before `main`. This shouldn't
- // be a problem in general.
- setupMutex(m_GCStopLock, "GCStop");
- setupCond(m_GCStopCond, "GCStop");
- setupMutex(m_GlobControlLock, "GlobControl");
- setupMutex(m_IDLock, "ID");
- setupMutex(m_InitLock, "Init");
- m_pCache = new CCache<CInfoBlock>;
- }
- srt::CUDTUnited::~CUDTUnited()
- {
- // Call it if it wasn't called already.
- // This will happen at the end of main() of the application,
- // when the user didn't call srt_cleanup().
- if (m_bGCStatus)
- {
- cleanup();
- }
- releaseMutex(m_GlobControlLock);
- releaseMutex(m_IDLock);
- releaseMutex(m_InitLock);
- // XXX There's some weird bug here causing this
- // to hangup on Windows. This might be either something
- // bigger, or some problem in pthread-win32. As this is
- // the application cleanup section, this can be temporarily
- // tolerated with simply exit the application without cleanup,
- // counting on that the system will take care of it anyway.
- #ifndef _WIN32
- releaseCond(m_GCStopCond);
- #endif
- releaseMutex(m_GCStopLock);
- delete m_pCache;
- }
- string srt::CUDTUnited::CONID(SRTSOCKET sock)
- {
- if (sock == 0)
- return "";
- std::ostringstream os;
- os << "@" << sock << ":";
- return os.str();
- }
- int srt::CUDTUnited::startup()
- {
- ScopedLock gcinit(m_InitLock);
- if (m_iInstanceCount++ > 0)
- return 1;
- // Global initialization code
- #ifdef _WIN32
- WORD wVersionRequested;
- WSADATA wsaData;
- wVersionRequested = MAKEWORD(2, 2);
- if (0 != WSAStartup(wVersionRequested, &wsaData))
- throw CUDTException(MJ_SETUP, MN_NONE, WSAGetLastError());
- #endif
- CCryptoControl::globalInit();
- PacketFilter::globalInit();
- if (m_bGCStatus)
- return 1;
- m_bClosing = false;
- if (!StartThread(m_GCThread, garbageCollect, this, "SRT:GC"))
- return -1;
- m_bGCStatus = true;
- HLOGC(inlog.Debug, log << "SRT Clock Type: " << SRT_SYNC_CLOCK_STR);
- return 0;
- }
- int srt::CUDTUnited::cleanup()
- {
- // IMPORTANT!!!
- // In this function there must be NO LOGGING AT ALL. This function may
- // potentially be called from within the global program destructor, and
- // therefore some of the facilities used by the logging system - including
- // the default std::cerr object bound to it by default, but also a different
- // stream that the user's app has bound to it, and which got destroyed
- // together with already exited main() - may be already deleted when
- // executing this procedure.
- ScopedLock gcinit(m_InitLock);
- if (--m_iInstanceCount > 0)
- return 0;
- if (!m_bGCStatus)
- return 0;
- {
- UniqueLock gclock(m_GCStopLock);
- m_bClosing = true;
- }
- // NOTE: we can do relaxed signaling here because
- // waiting on m_GCStopCond has a 1-second timeout,
- // after which the m_bClosing flag is cheched, which
- // is set here above. Worst case secenario, this
- // pthread_join() call will block for 1 second.
- CSync::notify_one_relaxed(m_GCStopCond);
- m_GCThread.join();
- m_bGCStatus = false;
- // Global destruction code
- #ifdef _WIN32
- WSACleanup();
- #endif
- return 0;
- }
- SRTSOCKET srt::CUDTUnited::generateSocketID(bool for_group)
- {
- ScopedLock guard(m_IDLock);
- int sockval = m_SocketIDGenerator - 1;
- // First problem: zero-value should be avoided by various reasons.
- if (sockval <= 0)
- {
- // We have a rollover on the socket value, so
- // definitely we haven't made the Columbus mistake yet.
- m_SocketIDGenerator = MAX_SOCKET_VAL;
- sockval = MAX_SOCKET_VAL;
- }
- // Check all sockets if any of them has this value.
- // Socket IDs are begin created this way:
- //
- // Initial random
- // |
- // |
- // |
- // |
- // ...
- // The only problem might be if the number rolls over
- // and reaches the same value from the opposite side.
- // This is still a valid socket value, but this time
- // we have to check, which sockets have been used already.
- if (sockval == m_SocketIDGenerator_init)
- {
- // Mark that since this point on the checks for
- // whether the socket ID is in use must be done.
- m_SocketIDGenerator_init = 0;
- }
- // This is when all socket numbers have been already used once.
- // This may happen after many years of running an application
- // constantly when the connection breaks and gets restored often.
- if (m_SocketIDGenerator_init == 0)
- {
- int startval = sockval;
- for (;;) // Roll until an unused value is found
- {
- enterCS(m_GlobControlLock);
- const bool exists =
- #if ENABLE_BONDING
- for_group
- ? m_Groups.count(sockval | SRTGROUP_MASK)
- :
- #endif
- m_Sockets.count(sockval);
- leaveCS(m_GlobControlLock);
- if (exists)
- {
- // The socket value is in use.
- --sockval;
- if (sockval <= 0)
- sockval = MAX_SOCKET_VAL;
- // Before continuing, check if we haven't rolled back to start again
- // This is virtually impossible, so just make an RTI error.
- if (sockval == startval)
- {
- // Of course, we don't lack memory, but actually this is so impossible
- // that a complete memory extinction is much more possible than this.
- // So treat this rather as a formal fallback for something that "should
- // never happen". This should make the socket creation functions, from
- // socket_create and accept, return this error.
- m_SocketIDGenerator = sockval + 1; // so that any next call will cause the same error
- throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
- }
- // try again, if this is a free socket
- continue;
- }
- // No socket found, this ID is free to use
- m_SocketIDGenerator = sockval;
- break;
- }
- }
- else
- {
- m_SocketIDGenerator = sockval;
- }
- // The socket value counter remains with the value rolled
- // without the group bit set; only the returned value may have
- // the group bit set.
- if (for_group)
- sockval = m_SocketIDGenerator | SRTGROUP_MASK;
- else
- sockval = m_SocketIDGenerator;
- LOGC(smlog.Debug, log << "generateSocketID: " << (for_group ? "(group)" : "") << ": @" << sockval);
- return sockval;
- }
- SRTSOCKET srt::CUDTUnited::newSocket(CUDTSocket** pps)
- {
- // XXX consider using some replacement of std::unique_ptr
- // so that exceptions will clean up the object without the
- // need for a dedicated code.
- CUDTSocket* ns = NULL;
- try
- {
- ns = new CUDTSocket;
- }
- catch (...)
- {
- delete ns;
- throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
- }
- try
- {
- ns->m_SocketID = generateSocketID();
- }
- catch (...)
- {
- delete ns;
- throw;
- }
- ns->m_Status = SRTS_INIT;
- ns->m_ListenSocket = 0;
- ns->core().m_SocketID = ns->m_SocketID;
- ns->core().m_pCache = m_pCache;
- try
- {
- HLOGC(smlog.Debug, log << CONID(ns->m_SocketID) << "newSocket: mapping socket " << ns->m_SocketID);
- // protect the m_Sockets structure.
- ScopedLock cs(m_GlobControlLock);
- m_Sockets[ns->m_SocketID] = ns;
- }
- catch (...)
- {
- // failure and rollback
- delete ns;
- ns = NULL;
- throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
- }
- if (pps)
- *pps = ns;
- return ns->m_SocketID;
- }
- int srt::CUDTUnited::newConnection(const SRTSOCKET listen,
- const sockaddr_any& peer,
- const CPacket& hspkt,
- CHandShake& w_hs,
- int& w_error,
- CUDT*& w_acpu)
- {
- CUDTSocket* ns = NULL;
- w_acpu = NULL;
- w_error = SRT_REJ_IPE;
- // Can't manage this error through an exception because this is
- // running in the listener loop.
- CUDTSocket* ls = locateSocket(listen);
- if (!ls)
- {
- LOGC(cnlog.Error, log << "IPE: newConnection by listener socket id=" << listen << " which DOES NOT EXIST.");
- return -1;
- }
- HLOGC(cnlog.Debug,
- log << "newConnection: creating new socket after listener @" << listen
- << " contacted with backlog=" << ls->m_uiBackLog);
- // if this connection has already been processed
- if ((ns = locatePeer(peer, w_hs.m_iID, w_hs.m_iISN)) != NULL)
- {
- if (ns->core().m_bBroken)
- {
- // last connection from the "peer" address has been broken
- ns->setClosed();
- ScopedLock acceptcg(ls->m_AcceptLock);
- ls->m_QueuedSockets.erase(ns->m_SocketID);
- }
- else
- {
- // connection already exist, this is a repeated connection request
- // respond with existing HS information
- HLOGC(cnlog.Debug, log << "newConnection: located a WORKING peer @" << w_hs.m_iID << " - ADAPTING.");
- w_hs.m_iISN = ns->core().m_iISN;
- w_hs.m_iMSS = ns->core().MSS();
- w_hs.m_iFlightFlagSize = ns->core().m_config.iFlightFlagSize;
- w_hs.m_iReqType = URQ_CONCLUSION;
- w_hs.m_iID = ns->m_SocketID;
- // Report the original UDT because it will be
- // required to complete the HS data for conclusion response.
- w_acpu = &ns->core();
- return 0;
- // except for this situation a new connection should be started
- }
- }
- else
- {
- HLOGC(cnlog.Debug,
- log << "newConnection: NOT located any peer @" << w_hs.m_iID << " - resuming with initial connection.");
- }
- // exceeding backlog, refuse the connection request
- if (ls->m_QueuedSockets.size() >= ls->m_uiBackLog)
- {
- w_error = SRT_REJ_BACKLOG;
- LOGC(cnlog.Note, log << "newConnection: listen backlog=" << ls->m_uiBackLog << " EXCEEDED");
- return -1;
- }
- try
- {
- ns = new CUDTSocket(*ls);
- // No need to check the peer, this is the address from which the request has come.
- ns->m_PeerAddr = peer;
- }
- catch (...)
- {
- w_error = SRT_REJ_RESOURCE;
- delete ns;
- LOGC(cnlog.Error, log << "IPE: newConnection: unexpected exception (probably std::bad_alloc)");
- return -1;
- }
- ns->core().m_RejectReason = SRT_REJ_UNKNOWN; // pre-set a universal value
- try
- {
- ns->m_SocketID = generateSocketID();
- }
- catch (const CUDTException&)
- {
- LOGC(cnlog.Fatal, log << "newConnection: IPE: all sockets occupied? Last gen=" << m_SocketIDGenerator);
- // generateSocketID throws exception, which can be naturally handled
- // when the call is derived from the API call, but here it's called
- // internally in response to receiving a handshake. It must be handled
- // here and turned into an erroneous return value.
- delete ns;
- return -1;
- }
- ns->m_ListenSocket = listen;
- ns->core().m_SocketID = ns->m_SocketID;
- ns->m_PeerID = w_hs.m_iID;
- ns->m_iISN = w_hs.m_iISN;
- HLOGC(cnlog.Debug,
- log << "newConnection: DATA: lsnid=" << listen << " id=" << ns->core().m_SocketID
- << " peerid=" << ns->core().m_PeerID << " ISN=" << ns->m_iISN);
- int error = 0;
- bool should_submit_to_accept = true;
- // Set the error code for all prospective problems below.
- // It won't be interpreted when result was successful.
- w_error = SRT_REJ_RESOURCE;
- // These can throw exception only when the memory allocation failed.
- // CUDT::connect() translates exception into CUDTException.
- // CUDT::open() may only throw original std::bad_alloc from new.
- // This is only to make the library extra safe (when your machine lacks
- // memory, it will continue to work, but fail to accept connection).
- try
- {
- // This assignment must happen b4 the call to CUDT::connect() because
- // this call causes sending the SRT Handshake through this socket.
- // Without this mapping the socket cannot be found and therefore
- // the SRT Handshake message would fail.
- HLOGC(cnlog.Debug, log <<
- "newConnection: incoming " << peer.str() << ", mapping socket " << ns->m_SocketID);
- {
- ScopedLock cg(m_GlobControlLock);
- m_Sockets[ns->m_SocketID] = ns;
- }
- if (ls->core().m_cbAcceptHook)
- {
- if (!ls->core().runAcceptHook(&ns->core(), peer.get(), w_hs, hspkt))
- {
- w_error = ns->core().m_RejectReason;
- error = 1;
- goto ERR_ROLLBACK;
- }
- }
- // bind to the same addr of listening socket
- ns->core().open();
- if (!updateListenerMux(ns, ls))
- {
- // This is highly unlikely if not impossible, but there's
- // a theoretical runtime chance of failure so it should be
- // handled
- ns->core().m_RejectReason = SRT_REJ_IPE;
- throw false; // let it jump directly into the omni exception handler
- }
- ns->core().acceptAndRespond(ls->m_SelfAddr, peer, hspkt, (w_hs));
- }
- catch (...)
- {
- // Extract the error that was set in this new failed entity.
- w_error = ns->core().m_RejectReason;
- error = 1;
- goto ERR_ROLLBACK;
- }
- ns->m_Status = SRTS_CONNECTED;
- // copy address information of local node
- // Precisely, what happens here is:
- // - Get the IP address and port from the system database
- ns->core().m_pSndQueue->m_pChannel->getSockAddr((ns->m_SelfAddr));
- // - OVERWRITE just the IP address itself by a value taken from piSelfIP
- // (the family is used exactly as the one taken from what has been returned
- // by getsockaddr)
- CIPAddress::pton((ns->m_SelfAddr), ns->core().m_piSelfIP, peer);
- {
- // protect the m_PeerRec structure (and group existence)
- ScopedLock glock(m_GlobControlLock);
- try
- {
- HLOGC(cnlog.Debug, log << "newConnection: mapping peer " << ns->m_PeerID
- << " to that socket (" << ns->m_SocketID << ")");
- m_PeerRec[ns->getPeerSpec()].insert(ns->m_SocketID);
- }
- catch (...)
- {
- LOGC(cnlog.Error, log << "newConnection: error when mapping peer!");
- error = 2;
- }
- // The access to m_GroupOf should be also protected, as the group
- // could be requested deletion in the meantime. This will hold any possible
- // removal from group and resetting m_GroupOf field.
- #if ENABLE_BONDING
- if (ns->m_GroupOf)
- {
- // XXX this might require another check of group type.
- // For redundancy group, at least, update the status in the group
- CUDTGroup* g = ns->m_GroupOf;
- ScopedLock grlock(g->m_GroupLock);
- if (g->m_bClosing)
- {
- error = 1; // "INTERNAL REJECTION"
- goto ERR_ROLLBACK;
- }
- // Check if this is the first socket in the group.
- // If so, give it up to accept, otherwise just do nothing
- // The client will be informed about the newly added connection at the
- // first moment when attempting to get the group status.
- for (CUDTGroup::gli_t gi = g->m_Group.begin(); gi != g->m_Group.end(); ++gi)
- {
- if (gi->laststatus == SRTS_CONNECTED)
- {
- HLOGC(cnlog.Debug,
- log << "Found another connected socket in the group: $" << gi->id
- << " - socket will be NOT given up for accepting");
- should_submit_to_accept = false;
- break;
- }
- }
- // Update the status in the group so that the next
- // operation can include the socket in the group operation.
- CUDTGroup::SocketData* gm = ns->m_GroupMemberData;
- HLOGC(cnlog.Debug,
- log << "newConnection(GROUP): Socket @" << ns->m_SocketID << " BELONGS TO $" << g->id() << " - will "
- << (should_submit_to_accept ? "" : "NOT ") << "report in accept");
- gm->sndstate = SRT_GST_IDLE;
- gm->rcvstate = SRT_GST_IDLE;
- gm->laststatus = SRTS_CONNECTED;
- if (!g->m_bConnected)
- {
- HLOGC(cnlog.Debug, log << "newConnection(GROUP): First socket connected, SETTING GROUP CONNECTED");
- g->m_bConnected = true;
- }
- // XXX PROLBEM!!! These events are subscribed here so that this is done once, lazily,
- // but groupwise connections could be accepted from multiple listeners for the same group!
- // m_listener MUST BE A CONTAINER, NOT POINTER!!!
- // ALSO: Maybe checking "the same listener" is not necessary as subscruption may be done
- // multiple times anyway?
- if (!g->m_listener)
- {
- // Newly created group from the listener, which hasn't yet
- // the listener set.
- g->m_listener = ls;
- // Listen on both first connected socket and continued sockets.
- // This might help with jump-over situations, and in regular continued
- // sockets the IN event won't be reported anyway.
- int listener_modes = SRT_EPOLL_ACCEPT | SRT_EPOLL_UPDATE;
- epoll_add_usock_INTERNAL(g->m_RcvEID, ls, &listener_modes);
- // This listening should be done always when a first connected socket
- // appears as accepted off the listener. This is for the sake of swait() calls
- // inside the group receiving and sending functions so that they get
- // interrupted when a new socket is connected.
- }
- // Add also per-direction subscription for the about-to-be-accepted socket.
- // Both first accepted socket that makes the group-accept and every next
- // socket that adds a new link.
- int read_modes = SRT_EPOLL_IN | SRT_EPOLL_ERR;
- int write_modes = SRT_EPOLL_OUT | SRT_EPOLL_ERR;
- epoll_add_usock_INTERNAL(g->m_RcvEID, ns, &read_modes);
- epoll_add_usock_INTERNAL(g->m_SndEID, ns, &write_modes);
- // With app reader, do not set groupPacketArrival (block the
- // provider array feature completely for now).
- /* SETUP HERE IF NEEDED
- ns->core().m_cbPacketArrival.set(ns->m_pUDT, &CUDT::groupPacketArrival);
- */
- }
- else
- {
- HLOGC(cnlog.Debug, log << "newConnection: Socket @" << ns->m_SocketID << " is not in a group");
- }
- #endif
- }
- if (should_submit_to_accept)
- {
- enterCS(ls->m_AcceptLock);
- try
- {
- ls->m_QueuedSockets.insert(ns->m_SocketID);
- }
- catch (...)
- {
- LOGC(cnlog.Error, log << "newConnection: error when queuing socket!");
- error = 3;
- }
- leaveCS(ls->m_AcceptLock);
- HLOGC(cnlog.Debug, log << "ACCEPT: new socket @" << ns->m_SocketID << " submitted for acceptance");
- // acknowledge users waiting for new connections on the listening socket
- m_EPoll.update_events(listen, ls->core().m_sPollID, SRT_EPOLL_ACCEPT, true);
- CGlobEvent::triggerEvent();
- // XXX the exact value of 'error' is ignored
- if (error > 0)
- {
- goto ERR_ROLLBACK;
- }
- // wake up a waiting accept() call
- CSync::lock_notify_one(ls->m_AcceptCond, ls->m_AcceptLock);
- }
- else
- {
- HLOGC(cnlog.Debug,
- log << "ACCEPT: new socket @" << ns->m_SocketID
- << " NOT submitted to acceptance, another socket in the group is already connected");
- // acknowledge INTERNAL users waiting for new connections on the listening socket
- // that are reported when a new socket is connected within an already connected group.
- m_EPoll.update_events(listen, ls->core().m_sPollID, SRT_EPOLL_UPDATE, true);
- CGlobEvent::triggerEvent();
- }
- ERR_ROLLBACK:
- // XXX the exact value of 'error' is ignored
- if (error > 0)
- {
- #if ENABLE_LOGGING
- static const char* why[] = {
- "UNKNOWN ERROR", "INTERNAL REJECTION", "IPE when mapping a socket", "IPE when inserting a socket"};
- LOGC(cnlog.Warn,
- log << CONID(ns->m_SocketID) << "newConnection: connection rejected due to: " << why[error] << " - "
- << RequestTypeStr(URQFailure(w_error)));
- #endif
- SRTSOCKET id = ns->m_SocketID;
- ns->core().closeInternal();
- ns->setClosed();
- // The mapped socket should be now unmapped to preserve the situation that
- // was in the original UDT code.
- // In SRT additionally the acceptAndRespond() function (it was called probably
- // connect() in UDT code) may fail, in which case this socket should not be
- // further processed and should be removed.
- {
- ScopedLock cg(m_GlobControlLock);
- #if ENABLE_BONDING
- if (ns->m_GroupOf)
- {
- HLOGC(smlog.Debug,
- log << "@" << ns->m_SocketID << " IS MEMBER OF $" << ns->m_GroupOf->id()
- << " - REMOVING FROM GROUP");
- ns->removeFromGroup(true);
- }
- #endif
- m_Sockets.erase(id);
- m_ClosedSockets[id] = ns;
- }
- return -1;
- }
- return 1;
- }
- // static forwarder
- int srt::CUDT::installAcceptHook(SRTSOCKET lsn, srt_listen_callback_fn* hook, void* opaq)
- {
- return uglobal().installAcceptHook(lsn, hook, opaq);
- }
- int srt::CUDTUnited::installAcceptHook(const SRTSOCKET lsn, srt_listen_callback_fn* hook, void* opaq)
- {
- try
- {
- CUDTSocket* s = locateSocket(lsn, ERH_THROW);
- s->core().installAcceptHook(hook, opaq);
- }
- catch (CUDTException& e)
- {
- SetThreadLocalError(e);
- return SRT_ERROR;
- }
- return 0;
- }
- int srt::CUDT::installConnectHook(SRTSOCKET lsn, srt_connect_callback_fn* hook, void* opaq)
- {
- return uglobal().installConnectHook(lsn, hook, opaq);
- }
- int srt::CUDTUnited::installConnectHook(const SRTSOCKET u, srt_connect_callback_fn* hook, void* opaq)
- {
- try
- {
- #if ENABLE_BONDING
- if (u & SRTGROUP_MASK)
- {
- GroupKeeper k(*this, u, ERH_THROW);
- k.group->installConnectHook(hook, opaq);
- return 0;
- }
- #endif
- CUDTSocket* s = locateSocket(u, ERH_THROW);
- s->core().installConnectHook(hook, opaq);
- }
- catch (CUDTException& e)
- {
- SetThreadLocalError(e);
- return SRT_ERROR;
- }
- return 0;
- }
- SRT_SOCKSTATUS srt::CUDTUnited::getStatus(const SRTSOCKET u)
- {
- // protects the m_Sockets structure
- ScopedLock cg(m_GlobControlLock);
- sockets_t::const_iterator i = m_Sockets.find(u);
- if (i == m_Sockets.end())
- {
- if (m_ClosedSockets.find(u) != m_ClosedSockets.end())
- return SRTS_CLOSED;
- return SRTS_NONEXIST;
- }
- return i->second->getStatus();
- }
- int srt::CUDTUnited::bind(CUDTSocket* s, const sockaddr_any& name)
- {
- ScopedLock cg(s->m_ControlLock);
- // cannot bind a socket more than once
- if (s->m_Status != SRTS_INIT)
- throw CUDTException(MJ_NOTSUP, MN_NONE, 0);
- if (s->core().m_config.iIpV6Only == -1 && name.family() == AF_INET6 && name.isany())
- {
- // V6ONLY option must be set explicitly if you want to bind to a wildcard address in IPv6
- HLOGP(smlog.Error,
- "bind: when binding to :: (IPv6 wildcard), SRTO_IPV6ONLY option must be set explicitly to 0 or 1");
- throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
- }
- s->core().open();
- updateMux(s, name);
- s->m_Status = SRTS_OPENED;
- // copy address information of local node
- s->core().m_pSndQueue->m_pChannel->getSockAddr((s->m_SelfAddr));
- return 0;
- }
- int srt::CUDTUnited::bind(CUDTSocket* s, UDPSOCKET udpsock)
- {
- ScopedLock cg(s->m_ControlLock);
- // cannot bind a socket more than once
- if (s->m_Status != SRTS_INIT)
- throw CUDTException(MJ_NOTSUP, MN_NONE, 0);
- sockaddr_any name;
- socklen_t namelen = sizeof name; // max of inet and inet6
- // This will preset the sa_family as well; the namelen is given simply large
- // enough for any family here.
- if (::getsockname(udpsock, &name.sa, &namelen) == -1)
- throw CUDTException(MJ_NOTSUP, MN_INVAL);
- // Successfully extracted, so update the size
- name.len = namelen;
- s->core().open();
- updateMux(s, name, &udpsock);
- s->m_Status = SRTS_OPENED;
- // copy address information of local node
- s->core().m_pSndQueue->m_pChannel->getSockAddr(s->m_SelfAddr);
- return 0;
- }
- int srt::CUDTUnited::listen(const SRTSOCKET u, int backlog)
- {
- if (backlog <= 0)
- throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
- // Don't search for the socket if it's already -1;
- // this never is a valid socket.
- if (u == UDT::INVALID_SOCK)
- throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
- CUDTSocket* s = locateSocket(u);
- if (!s)
- throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
- ScopedLock cg(s->m_ControlLock);
- // NOTE: since now the socket is protected against simultaneous access.
- // In the meantime the socket might have been closed, which means that
- // it could have changed the state. It could be also set listen in another
- // thread, so check it out.
- // do nothing if the socket is already listening
- if (s->m_Status == SRTS_LISTENING)
- return 0;
- // a socket can listen only if is in OPENED status
- if (s->m_Status != SRTS_OPENED)
- throw CUDTException(MJ_NOTSUP, MN_ISUNBOUND, 0);
- // [[using assert(s->m_Status == OPENED)]];
- // listen is not supported in rendezvous connection setup
- if (s->core().m_config.bRendezvous)
- throw CUDTException(MJ_NOTSUP, MN_ISRENDEZVOUS, 0);
- s->m_uiBackLog = backlog;
- // [[using assert(s->m_Status == OPENED)]]; // (still, unchanged)
- s->core().setListenState(); // propagates CUDTException,
- // if thrown, remains in OPENED state if so.
- s->m_Status = SRTS_LISTENING;
- return 0;
- }
- SRTSOCKET srt::CUDTUnited::accept_bond(const SRTSOCKET listeners[], int lsize, int64_t msTimeOut)
- {
- CEPollDesc* ed = 0;
- int eid = m_EPoll.create(&ed);
- // Destroy it at return - this function can be interrupted
- // by an exception.
- struct AtReturn
- {
- int eid;
- CUDTUnited* that;
- AtReturn(CUDTUnited* t, int e)
- : eid(e)
- , that(t)
- {
- }
- ~AtReturn() { that->m_EPoll.release(eid); }
- } l_ar(this, eid);
- // Subscribe all of listeners for accept
- int events = SRT_EPOLL_ACCEPT;
- for (int i = 0; i < lsize; ++i)
- {
- srt_epoll_add_usock(eid, listeners[i], &events);
- }
- CEPoll::fmap_t st;
- m_EPoll.swait(*ed, (st), msTimeOut, true);
- if (st.empty())
- {
- // Sanity check
- throw CUDTException(MJ_AGAIN, MN_XMTIMEOUT, 0);
- }
- // Theoretically we can have a situation that more than one
- // listener is ready for accept. In this case simply get
- // only the first found.
- int lsn = st.begin()->first;
- sockaddr_storage dummy;
- int outlen = sizeof dummy;
- return accept(lsn, ((sockaddr*)&dummy), (&outlen));
- }
- SRTSOCKET srt::CUDTUnited::accept(const SRTSOCKET listen, sockaddr* pw_addr, int* pw_addrlen)
- {
- if (pw_addr && !pw_addrlen)
- {
- LOGC(cnlog.Error, log << "srt_accept: provided address, but address length parameter is missing");
- throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
- }
- CUDTSocket* ls = locateSocket(listen);
- if (ls == NULL)
- {
- LOGC(cnlog.Error, log << "srt_accept: invalid listener socket ID value: " << listen);
- throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
- }
- // the "listen" socket must be in LISTENING status
- if (ls->m_Status != SRTS_LISTENING)
- {
- LOGC(cnlog.Error, log << "srt_accept: socket @" << listen << " is not in listening state (forgot srt_listen?)");
- throw CUDTException(MJ_NOTSUP, MN_NOLISTEN, 0);
- }
- // no "accept" in rendezvous connection setup
- if (ls->core().m_config.bRendezvous)
- {
- LOGC(cnlog.Fatal,
- log << "CUDTUnited::accept: RENDEZVOUS flag passed through check in srt_listen when it set listen state");
- // This problem should never happen because `srt_listen` function should have
- // checked this situation before and not set listen state in result.
- // Inform the user about the invalid state in the universal way.
- throw CUDTException(MJ_NOTSUP, MN_NOLISTEN, 0);
- }
- SRTSOCKET u = CUDT::INVALID_SOCK;
- bool accepted = false;
- // !!only one connection can be set up each time!!
- while (!accepted)
- {
- UniqueLock accept_lock(ls->m_AcceptLock);
- CSync accept_sync(ls->m_AcceptCond, accept_lock);
- if ((ls->m_Status != SRTS_LISTENING) || ls->core().m_bBroken)
- {
- // This socket has been closed.
- accepted = true;
- }
- else if (ls->m_QueuedSockets.size() > 0)
- {
- set<SRTSOCKET>::iterator b = ls->m_QueuedSockets.begin();
- u = *b;
- ls->m_QueuedSockets.erase(b);
- accepted = true;
- }
- else if (!ls->core().m_config.bSynRecving)
- {
- accepted = true;
- }
- if (!accepted && (ls->m_Status == SRTS_LISTENING))
- accept_sync.wait();
- if (ls->m_QueuedSockets.empty())
- m_EPoll.update_events(listen, ls->core().m_sPollID, SRT_EPOLL_ACCEPT, false);
- }
- if (u == CUDT::INVALID_SOCK)
- {
- // non-blocking receiving, no connection available
- if (!ls->core().m_config.bSynRecving)
- {
- LOGC(cnlog.Error, log << "srt_accept: no pending connection available at the moment");
- throw CUDTException(MJ_AGAIN, MN_RDAVAIL, 0);
- }
- LOGC(cnlog.Error, log << "srt_accept: listener socket @" << listen << " is already closed");
- // listening socket is closed
- throw CUDTException(MJ_SETUP, MN_CLOSED, 0);
- }
- CUDTSocket* s = locateSocket(u);
- if (s == NULL)
- {
- LOGC(cnlog.Error, log << "srt_accept: pending connection has unexpectedly closed");
- throw CUDTException(MJ_SETUP, MN_CLOSED, 0);
- }
- // Set properly the SRTO_GROUPCONNECT flag
- s->core().m_config.iGroupConnect = 0;
- // Check if LISTENER has the SRTO_GROUPCONNECT flag set,
- // and the already accepted socket has successfully joined
- // the mirror group. If so, RETURN THE GROUP ID, not the socket ID.
- #if ENABLE_BONDING
- if (ls->core().m_config.iGroupConnect == 1 && s->m_GroupOf)
- {
- // Put a lock to protect the group against accidental deletion
- // in the meantime.
- ScopedLock glock(m_GlobControlLock);
- // Check again; it's unlikely to happen, but
- // it's a theoretically possible scenario
- if (s->m_GroupOf)
- {
- u = s->m_GroupOf->m_GroupID;
- s->core().m_config.iGroupConnect = 1; // should be derived from ls, but make sure
- // Mark the beginning of the connection at the moment
- // when the group ID is returned to the app caller
- s->m_GroupOf->m_stats.tsLastSampleTime = steady_clock::now();
- }
- else
- {
- LOGC(smlog.Error, log << "accept: IPE: socket's group deleted in the meantime of accept process???");
- }
- }
- #endif
- ScopedLock cg(s->m_ControlLock);
- if (pw_addr != NULL && pw_addrlen != NULL)
- {
- // Check if the length of the buffer to fill the name in
- // was large enough.
- const int len = s->m_PeerAddr.size();
- if (*pw_addrlen < len)
- throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
- memcpy((pw_addr), &s->m_PeerAddr, len);
- *pw_addrlen = len;
- }
- return u;
- }
- int srt::CUDTUnited::connect(SRTSOCKET u, const sockaddr* srcname, const sockaddr* tarname, int namelen)
- {
- // Here both srcname and tarname must be specified
- if (!srcname || !tarname || namelen < int(sizeof(sockaddr_in)))
- {
- LOGC(aclog.Error,
- log << "connect(with source): invalid call: srcname=" << srcname << " tarname=" << tarname
- << " namelen=" << namelen);
- throw CUDTException(MJ_NOTSUP, MN_INVAL);
- }
- sockaddr_any source_addr(srcname, namelen);
- if (source_addr.len == 0)
- throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
- sockaddr_any target_addr(tarname, namelen);
- if (target_addr.len == 0)
- throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
- #if ENABLE_BONDING
- // Check affiliation of the socket. It's now allowed for it to be
- // a group or socket. For a group, add automatically a socket to
- // the group.
- if (u & SRTGROUP_MASK)
- {
- GroupKeeper k(*this, u, ERH_THROW);
- // Note: forced_isn is ignored when connecting a group.
- // The group manages the ISN by itself ALWAYS, that is,
- // it's generated anew for the very first socket, and then
- // derived by all sockets in the group.
- SRT_SOCKGROUPCONFIG gd[1] = {srt_prepare_endpoint(srcname, tarname, namelen)};
- // When connecting to exactly one target, only this very target
- // can be returned as a socket, so rewritten back array can be ignored.
- return singleMemberConnect(k.group, gd);
- }
- #endif
- CUDTSocket* s = locateSocket(u);
- if (s == NULL)
- throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
- // For a single socket, just do bind, then connect
- bind(s, source_addr);
- return connectIn(s, target_addr, SRT_SEQNO_NONE);
- }
- int srt::CUDTUnited::connect(const SRTSOCKET u, const sockaddr* name, int namelen, int32_t forced_isn)
- {
- if (!name || namelen < int(sizeof(sockaddr_in)))
- {
- LOGC(aclog.Error, log << "connect(): invalid call: name=" << name << " namelen=" << namelen);
- throw CUDTException(MJ_NOTSUP, MN_INVAL);
- }
- sockaddr_any target_addr(name, namelen);
- if (target_addr.len == 0)
- throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
- #if ENABLE_BONDING
- // Check affiliation of the socket. It's now allowed for it to be
- // a group or socket. For a group, add automatically a socket to
- // the group.
- if (u & SRTGROUP_MASK)
- {
- GroupKeeper k(*this, u, ERH_THROW);
- // Note: forced_isn is ignored when connecting a group.
- // The group manages the ISN by itself ALWAYS, that is,
- // it's generated anew for the very first socket, and then
- // derived by all sockets in the group.
- SRT_SOCKGROUPCONFIG gd[1] = {srt_prepare_endpoint(NULL, name, namelen)};
- return singleMemberConnect(k.group, gd);
- }
- #endif
- CUDTSocket* s = locateSocket(u);
- if (!s)
- throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
- return connectIn(s, target_addr, forced_isn);
- }
- #if ENABLE_BONDING
- int srt::CUDTUnited::singleMemberConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* gd)
- {
- int gstat = groupConnect(pg, gd, 1);
- if (gstat == -1)
- {
- // We have only one element here, so refer to it.
- // Sanity check
- if (gd->errorcode == SRT_SUCCESS)
- gd->errorcode = SRT_EINVPARAM;
- CodeMajor mj = CodeMajor(gd->errorcode / 1000);
- CodeMinor mn = CodeMinor(gd->errorcode % 1000);
- return CUDT::APIError(mj, mn);
- }
- return gstat;
- }
- // [[using assert(pg->m_iBusy > 0)]]
- int srt::CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, int arraysize)
- {
- CUDTGroup& g = *pg;
- SRT_ASSERT(g.m_iBusy > 0);
- // Check and report errors on data brought in by srt_prepare_endpoint,
- // as the latter function has no possibility to report errors.
- for (int tii = 0; tii < arraysize; ++tii)
- {
- if (targets[tii].srcaddr.ss_family != targets[tii].peeraddr.ss_family)
- {
- LOGC(aclog.Error, log << "srt_connect/group: family differs on source and target address");
- throw CUDTException(MJ_NOTSUP, MN_INVAL);
- }
- if (targets[tii].weight > CUDT::MAX_WEIGHT)
- {
- LOGC(aclog.Error, log << "srt_connect/group: weight value must be between 0 and " << (+CUDT::MAX_WEIGHT));
- throw CUDTException(MJ_NOTSUP, MN_INVAL);
- }
- }
- // If the open state switched to OPENED, the blocking mode
- // must make it wait for connecting it. Doing connect when the
- // group is already OPENED returns immediately, regardless if the
- // connection is going to later succeed or fail (this will be
- // known in the group state information).
- bool block_new_opened = !g.m_bOpened && g.m_bSynRecving;
- const bool was_empty = g.groupEmpty();
- // In case the group was retried connection, clear first all epoll readiness.
- const int ncleared = m_EPoll.update_events(g.id(), g.m_sPollID, SRT_EPOLL_ERR, false);
- if (was_empty || ncleared)
- {
- HLOGC(aclog.Debug,
- log << "srt_connect/group: clearing IN/OUT because was_empty=" << was_empty
- << " || ncleared=" << ncleared);
- // IN/OUT only in case when the group is empty, otherwise it would
- // clear out correct readiness resulting from earlier calls.
- // This also should happen if ERR flag was set, as IN and OUT could be set, too.
- m_EPoll.update_events(g.id(), g.m_sPollID, SRT_EPOLL_IN | SRT_EPOLL_OUT, false);
- }
- SRTSOCKET retval = -1;
- int eid = -1;
- int connect_modes = SRT_EPOLL_CONNECT | SRT_EPOLL_ERR;
- if (block_new_opened)
- {
- // Create this eid only to block-wait for the first
- // connection.
- eid = srt_epoll_create();
- }
- // Use private map to avoid searching in the
- // overall map.
- map<SRTSOCKET, CUDTSocket*> spawned;
- HLOGC(aclog.Debug,
- log << "groupConnect: will connect " << arraysize << " links and "
- << (block_new_opened ? "BLOCK until any is ready" : "leave the process in background"));
- for (int tii = 0; tii < arraysize; ++tii)
- {
- sockaddr_any target_addr(targets[tii].peeraddr);
- sockaddr_any source_addr(targets[tii].srcaddr);
- SRTSOCKET& sid_rloc = targets[tii].id;
- int& erc_rloc = targets[tii].errorcode;
- erc_rloc = SRT_SUCCESS; // preinitialized
- HLOGC(aclog.Debug, log << "groupConnect: taking on " << sockaddr_any(targets[tii].peeraddr).str());
- CUDTSocket* ns = 0;
- // NOTE: After calling newSocket, the socket is mapped into m_Sockets.
- // It must be MANUALLY removed from this list in case we need it deleted.
- SRTSOCKET sid = newSocket(&ns);
- if (pg->m_cbConnectHook)
- {
- // Derive the connect hook by the socket, if set on the group
- ns->core().m_cbConnectHook = pg->m_cbConnectHook;
- }
- SRT_SocketOptionObject* config = targets[tii].config;
- // XXX Support non-blocking mode:
- // If the group has nonblocking set for connect (SNDSYN),
- // then it must set so on the socket. Then, the connection
- // process is asynchronous. The socket appears first as
- // GST_PENDING state, and only after the socket becomes
- // connected does its status in the group turn into GST_IDLE.
- // Set all options that were requested by the options set on a group
- // prior to connecting.
- string error_reason SRT_ATR_UNUSED;
- try
- {
- for (size_t i = 0; i < g.m_config.size(); ++i)
- {
- HLOGC(aclog.Debug, log << "groupConnect: OPTION @" << sid << " #" << g.m_config[i].so);
- error_reason = "setting group-derived option: #" + Sprint(g.m_config[i].so);
- ns->core().setOpt(g.m_config[i].so, &g.m_config[i].value[0], (int)g.m_config[i].value.size());
- }
- // Do not try to set a user option if failed already.
- if (config)
- {
- error_reason = "user option";
- ns->core().applyMemberConfigObject(*config);
- }
- error_reason = "bound address";
- // We got it. Bind the socket, if the source address was set
- if (!source_addr.empty())
- bind(ns, source_addr);
- }
- catch (CUDTException& e)
- {
- // Just notify the problem, but the loop must continue.
- // Set the original error as reported.
- targets[tii].errorcode = e.getErrorCode();
- LOGC(aclog.Error, log << "srt_connect_group: failed to set " << error_reason);
- }
- catch (...)
- {
- // Set the general EINVPARAM - this error should never happen
- LOGC(aclog.Error, log << "IPE: CUDT::setOpt reported unknown exception");
- targets[tii].errorcode = SRT_EINVPARAM;
- }
- // Add socket to the group.
- // Do it after setting all stored options, as some of them may
- // influence some group data.
- srt::groups::SocketData data = srt::groups::prepareSocketData(ns);
- if (targets[tii].token != -1)
- {
- // Reuse the token, if specified by the caller
- data.token = targets[tii].token;
- }
- else
- {
- // Otherwise generate and write back the token
- data.token = CUDTGroup::genToken();
- targets[tii].token = data.token;
- }
- {
- ScopedLock cs(m_GlobControlLock);
- if (m_Sockets.count(sid) == 0)
- {
- HLOGC(aclog.Debug, log << "srt_connect_group: socket @" << sid << " deleted in process");
- // Someone deleted the socket in the meantime?
- // Unlikely, but possible in theory.
- // Don't delete anyhting - it's alreay done.
- continue;
- }
- // There's nothing wrong with preparing the data first
- // even if this happens for nothing. But now, under the lock
- // and after checking that the socket still exists, check now
- // if this succeeded, and then also if the group is still usable.
- // The group will surely exist because it's set busy, until the
- // end of this function. But it might be simultaneously requested closed.
- bool proceed = true;
- if (targets[tii].errorcode != SRT_SUCCESS)
- {
- HLOGC(aclog.Debug,
- log << "srt_connect_group: not processing @" << sid << " due to error in setting options");
- proceed = false;
- }
- if (g.m_bClosing)
- {
- HLOGC(aclog.Debug,
- log << "srt_connect_group: not processing @" << sid << " due to CLOSED GROUP $" << g.m_GroupID);
- proceed = false;
- }
- if (proceed)
- {
- CUDTGroup::SocketData* f = g.add(data);
- ns->m_GroupMemberData = f;
- ns->m_GroupOf = &g;
- f->weight = targets[tii].weight;
- HLOGC(aclog.Debug, log << "srt_connect_group: socket @" << sid << " added to group $" << g.m_GroupID);
- }
- else
- {
- targets[tii].id = CUDT::INVALID_SOCK;
- delete ns;
- m_Sockets.erase(sid);
- // If failed to set options, then do not continue
- // neither with binding, nor with connecting.
- continue;
- }
- }
- // XXX This should be reenabled later, this should
- // be probably still in use to exchange information about
- // packets asymmetrically lost. But for no other purpose.
- /*
- ns->core().m_cbPacketArrival.set(ns->m_pUDT, &CUDT::groupPacketArrival);
- */
- int isn = g.currentSchedSequence();
- // Set it the groupconnect option, as all in-group sockets should have.
- ns->core().m_config.iGroupConnect = 1;
- // Every group member will have always nonblocking
- // (this implies also non-blocking connect/accept).
- // The group facility functions will block when necessary
- // using epoll_wait.
- ns->core().m_config.bSynRecving = false;
- ns->core().m_config.bSynSending = false;
- HLOGC(aclog.Debug, log << "groupConnect: NOTIFIED AS PENDING @" << sid << " both read and write");
- // If this socket is not to block the current connect process,
- // it may still be needed for the further check if the redundant
- // connection succeeded or failed and whether the new socket is
- // ready to use or needs to be closed.
- epoll_add_usock_INTERNAL(g.m_SndEID, ns, &connect_modes);
- epoll_add_usock_INTERNAL(g.m_RcvEID, ns, &connect_modes);
- // Adding a socket on which we need to block to BOTH these tracking EIDs
- // and the blocker EID. We'll simply remove from them later all sockets that
- // got connected state or were broken.
- if (block_new_opened)
- {
- HLOGC(aclog.Debug, log << "groupConnect: WILL BLOCK on @" << sid << " until connected");
- epoll_add_usock_INTERNAL(eid, ns, &connect_modes);
- }
- // And connect
- try
- {
- HLOGC(aclog.Debug, log << "groupConnect: connecting a new socket with ISN=" << isn);
- connectIn(ns, target_addr, isn);
- }
- catch (const CUDTException& e)
- {
- LOGC(aclog.Error,
- log << "groupConnect: socket @" << sid << " in group " << pg->id() << " failed to connect");
- // We know it does belong to a group.
- // Remove it first because this involves a mutex, and we want
- // to avoid locking more than one mutex at a time.
- erc_rloc = e.getErrorCode();
- targets[tii].errorcode = e.getErrorCode();
- targets[tii].id = CUDT::INVALID_SOCK;
- ScopedLock cl(m_GlobControlLock);
- ns->removeFromGroup(false);
- m_Sockets.erase(ns->m_SocketID);
- // Intercept to delete the socket on failure.
- delete ns;
- continue;
- }
- catch (...)
- {
- LOGC(aclog.Fatal, log << "groupConnect: IPE: UNKNOWN EXCEPTION from connectIn");
- targets[tii].errorcode = SRT_ESYSOBJ;
- targets[tii].id = CUDT::INVALID_SOCK;
- ScopedLock cl(m_GlobControlLock);
- ns->removeFromGroup(false);
- m_Sockets.erase(ns->m_SocketID);
- // Intercept to delete the socket on failure.
- delete ns;
- // Do not use original exception, it may crash off a C API.
- throw CUDTException(MJ_SYSTEMRES, MN_OBJECT);
- }
- SRT_SOCKSTATUS st;
- {
- ScopedLock grd(ns->m_ControlLock);
- st = ns->getStatus();
- }
- {
- // NOTE: Not applying m_GlobControlLock because the group is now
- // set busy, so it won't be deleted, even if it was requested to be closed.
- ScopedLock grd(g.m_GroupLock);
- if (!ns->m_GroupOf)
- {
- // The situation could get changed between the unlock and lock of m_GroupLock.
- // This must be checked again.
- // If a socket has been removed from group, it means that some other thread is
- // currently trying to delete the socket. Therefore it doesn't have, and even shouldn't,
- // be deleted here. Just exit with error report.
- LOGC(aclog.Error, log << "groupConnect: self-created member socket deleted during process, SKIPPING.");
- // Do not report the error from here, just ignore this socket.
- continue;
- }
- // If m_GroupOf is not NULL, the m_IncludedIter is still valid.
- CUDTGroup::SocketData* f = ns->m_GroupMemberData;
- // Now under a group lock, we need to make sure the group isn't being closed
- // in order not to add a socket to a dead group.
- if (g.m_bClosing)
- {
- LOGC(aclog.Error, log << "groupConnect: group deleted while connecting; breaking the process");
- // Set the status as pending so that the socket is taken care of later.
- // Note that all earlier sockets that were processed in this loop were either
- // set BROKEN or PENDING.
- f->sndstate = SRT_GST_PENDING;
- f->rcvstate = SRT_GST_PENDING;
- retval = -1;
- break;
- }
- HLOGC(aclog.Debug,
- log << "groupConnect: @" << sid << " connection successful, setting group OPEN (was "
- << (g.m_bOpened ? "ALREADY" : "NOT") << "), will " << (block_new_opened ? "" : "NOT ")
- << "block the connect call, status:" << SockStatusStr(st));
- // XXX OPEN OR CONNECTED?
- // BLOCK IF NOT OPEN OR BLOCK IF NOT CONNECTED?
- //
- // What happens to blocking when there are 2 connections
- // pending, about to be broken, and srt_connect() is called again?
- // SHOULD BLOCK the latter, even though is OPEN.
- // Or, OPEN should be removed from here and srt_connect(_group)
- // should block always if the group doesn't have neither 1 conencted link
- g.m_bOpened = true;
- g.m_stats.tsLastSampleTime = steady_clock::now();
- f->laststatus = st;
- // Check the socket status and update it.
- // Turn the group state of the socket to IDLE only if
- // connection is established or in progress
- f->agent = source_addr;
- f->peer = target_addr;
- if (st >= SRTS_BROKEN)
- {
- f->sndstate = SRT_GST_BROKEN;
- f->rcvstate = SRT_GST_BROKEN;
- epoll_remove_socket_INTERNAL(g.m_SndEID, ns);
- epoll_remove_socket_INTERNAL(g.m_RcvEID, ns);
- }
- else
- {
- f->sndstate = SRT_GST_PENDING;
- f->rcvstate = SRT_GST_PENDING;
- spawned[sid] = ns;
- sid_rloc = sid;
- erc_rloc = 0;
- retval = sid;
- }
- }
- }
- if (retval == -1)
- {
- HLOGC(aclog.Debug, log << "groupConnect: none succeeded as background-spawn, exit with error");
- block_new_opened = false; // Avoid executing further while loop
- }
- vector<SRTSOCKET> broken;
- while (block_new_opened)
- {
- if (spawned.empty())
- {
- // All were removed due to errors.
- retval = -1;
- break;
- }
- HLOGC(aclog.Debug, log << "groupConnect: first connection, applying EPOLL WAITING.");
- int len = (int)spawned.size();
- vector<SRTSOCKET> ready(spawned.size());
- const int estat = srt_epoll_wait(eid,
- NULL,
- NULL, // IN/ACCEPT
- &ready[0],
- &len, // OUT/CONNECT
- -1, // indefinitely (FIXME Check if it needs to REGARD CONNECTION TIMEOUT!)
- NULL,
- NULL,
- NULL,
- NULL);
- // Sanity check. Shouldn't happen if subs are in sync with spawned.
- if (estat == -1)
- {
- #if ENABLE_LOGGING
- CUDTException& x = CUDT::getlasterror();
- if (x.getErrorCode() != SRT_EPOLLEMPTY)
- {
- LOGC(aclog.Error,
- log << "groupConnect: srt_epoll_wait failed not because empty, unexpected IPE:"
- << x.getErrorMessage());
- }
- #endif
- HLOGC(aclog.Debug, log << "groupConnect: srt_epoll_wait failed - breaking the wait loop");
- retval = -1;
- break;
- }
- // At the moment when you are going to work with real sockets,
- // lock the groups so that no one messes up with something here
- // in the meantime.
- ScopedLock lock(*g.exp_groupLock());
- // NOTE: UNDER m_GroupLock, NO API FUNCTION CALLS DARE TO HAPPEN BELOW!
- // Check first if a socket wasn't closed in the meantime. It will be
- // automatically removed from all EIDs, but there's no sense in keeping
- // them in 'spawned' map.
- for (map<SRTSOCKET, CUDTSocket*>::iterator y = spawned.begin(); y != spawned.end(); ++y)
- {
- SRTSOCKET sid = y->first;
- if (y->second->getStatus() >= SRTS_BROKEN)
- {
- HLOGC(aclog.Debug,
- log << "groupConnect: Socket @" << sid
- << " got BROKEN in the meantine during the check, remove from candidates");
- // Remove from spawned and try again
- broken.push_back(sid);
- epoll_remove_socket_INTERNAL(eid, y->second);
- epoll_remove_socket_INTERNAL(g.m_SndEID, y->second);
- epoll_remove_socket_INTERNAL(g.m_RcvEID, y->second);
- }
- }
- // Remove them outside the loop because this can't be done
- // while iterating over the same container.
- for (size_t i = 0; i < broken.size(); ++i)
- spawned.erase(broken[i]);
- // Check the sockets if they were reported due
- // to have connected or due to have failed.
- // Distill successful ones. If distilled nothing, return -1.
- // If not all sockets were reported in this instance, repeat
- // the call until you get information about all of them.
- for (int i = 0; i < len; ++i)
- {
- map<SRTSOCKET, CUDTSocket*>::iterator x = spawned.find(ready[i]);
- if (x == spawned.end())
- {
- // Might be removed above - ignore it.
- continue;
- }
- SRTSOCKET sid = x->first;
- CUDTSocket* s = x->second;
- // Check status. If failed, remove from spawned
- // and try again.
- SRT_SOCKSTATUS st = s->getStatus();
- if (st >= SRTS_BROKEN)
- {
- HLOGC(aclog.Debug,
- log << "groupConnect: Socket @" << sid
- << " got BROKEN during background connect, remove & TRY AGAIN");
- // Remove from spawned and try again
- if (spawned.erase(sid))
- broken.push_back(sid);
- epoll_remove_socket_INTERNAL(eid, s);
- epoll_remove_socket_INTERNAL(g.m_SndEID, s);
- epoll_remove_socket_INTERNAL(g.m_RcvEID, s);
- continue;
- }
- if (st == SRTS_CONNECTED)
- {
- HLOGC(aclog.Debug,
- log << "groupConnect: Socket @" << sid << " got CONNECTED as first in the group - reporting");
- retval = sid;
- g.m_bConnected = true;
- block_new_opened = false; // Interrupt also rolling epoll (outer loop)
- // Remove this socket from SND EID because it doesn't need to
- // be connection-tracked anymore. Don't remove from the RCV EID
- // however because RCV procedure relies on epoll also for reading
- // and when found this socket connected it will "upgrade" it to
- // read-ready tracking only.
- epoll_remove_socket_INTERNAL(g.m_SndEID, s);
- break;
- }
- // Spurious?
- HLOGC(aclog.Debug,
- log << "groupConnect: Socket @" << sid << " got spurious wakeup in " << SockStatusStr(st)
- << " TRY AGAIN");
- }
- // END of m_GroupLock CS - you can safely use API functions now.
- }
- // Finished, delete epoll.
- if (eid != -1)
- {
- HLOGC(aclog.Debug, log << "connect FIRST IN THE GROUP finished, removing E" << eid);
- srt_epoll_release(eid);
- }
- for (vector<SRTSOCKET>::iterator b = broken.begin(); b != broken.end(); ++b)
- {
- CUDTSocket* s = locateSocket(*b, ERH_RETURN);
- if (!s)
- continue;
- // This will also automatically remove it from the group and all eids
- close(s);
- }
- // There's no possibility to report a problem on every connection
- // separately in case when every single connection has failed. What
- // is more interesting, it's only a matter of luck that all connections
- // fail at exactly the same time. OTOH if all are to fail, this
- // function will still be polling sockets to determine the last man
- // standing. Each one could, however, break by a different reason,
- // for example, one by timeout, another by wrong passphrase. Check
- // the `errorcode` field to determine the reaon for particular link.
- if (retval == -1)
- throw CUDTException(MJ_CONNECTION, MN_CONNLOST, 0);
- return retval;
- }
- #endif
- int srt::CUDTUnited::connectIn(CUDTSocket* s, const sockaddr_any& target_addr, int32_t forced_isn)
- {
- ScopedLock cg(s->m_ControlLock);
- // a socket can "connect" only if it is in the following states:
- // - OPENED: assume the socket binding parameters are configured
- // - INIT: configure binding parameters here
- // - any other (meaning, already connected): report error
- if (s->m_Status == SRTS_INIT)
- {
- if (s->core().m_config.bRendezvous)
- throw CUDTException(MJ_NOTSUP, MN_ISRENDUNBOUND, 0);
- // If bind() was done first on this socket, then the
- // socket will not perform this step. This actually does the
- // same thing as bind() does, just with empty address so that
- // the binding parameters are autoselected.
- s->core().open();
- sockaddr_any autoselect_sa(target_addr.family());
- // This will create such a sockaddr_any that
- // will return true from empty().
- updateMux(s, autoselect_sa); // <<---- updateMux
- // -> C(Snd|Rcv)Queue::init
- // -> pthread_create(...C(Snd|Rcv)Queue::worker...)
- s->m_Status = SRTS_OPENED;
- }
- else
- {
- if (s->m_Status != SRTS_OPENED)
- throw CUDTException(MJ_NOTSUP, MN_ISCONNECTED, 0);
- // status = SRTS_OPENED, so family should be known already.
- if (target_addr.family() != s->m_SelfAddr.family())
- {
- LOGP(cnlog.Error, "srt_connect: socket is bound to a different family than target address");
- throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
- }
- }
- // connect_complete() may be called before connect() returns.
- // So we need to update the status before connect() is called,
- // otherwise the status may be overwritten with wrong value
- // (CONNECTED vs. CONNECTING).
- s->m_Status = SRTS_CONNECTING;
- /*
- * In blocking mode, connect can block for up to 30 seconds for
- * rendez-vous mode. Holding the s->m_ControlLock prevent close
- * from cancelling the connect
- */
- try
- {
- // record peer address
- s->m_PeerAddr = target_addr;
- s->core().startConnect(target_addr, forced_isn);
- }
- catch (const CUDTException&) // Interceptor, just to change the state.
- {
- s->m_Status = SRTS_OPENED;
- throw;
- }
- return 0;
- }
- int srt::CUDTUnited::close(const SRTSOCKET u)
- {
- #if ENABLE_BONDING
- if (u & SRTGROUP_MASK)
- {
- GroupKeeper k(*this, u, ERH_THROW);
- k.group->close();
- deleteGroup(k.group);
- return 0;
- }
- #endif
- CUDTSocket* s = locateSocket(u);
- if (!s)
- throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
- return close(s);
- }
- #if ENABLE_BONDING
- void srt::CUDTUnited::deleteGroup(CUDTGroup* g)
- {
- using srt_logging::gmlog;
- srt::sync::ScopedLock cg(m_GlobControlLock);
- return deleteGroup_LOCKED(g);
- }
- // [[using locked(m_GlobControlLock)]]
- void srt::CUDTUnited::deleteGroup_LOCKED(CUDTGroup* g)
- {
- SRT_ASSERT(g->groupEmpty());
- // After that the group is no longer findable by GroupKeeper
- m_Groups.erase(g->m_GroupID);
- m_ClosedGroups[g->m_GroupID] = g;
- // Paranoid check: since the group is in m_ClosedGroups
- // it may potentially be deleted. Make sure no socket points
- // to it. Actually all sockets should have been already removed
- // from the group container, so if any does, it's invalid.
- for (sockets_t::iterator i = m_Sockets.begin(); i != m_Sockets.end(); ++i)
- {
- CUDTSocket* s = i->second;
- if (s->m_GroupOf == g)
- {
- HLOGC(smlog.Debug, log << "deleteGroup: IPE: existing @" << s->m_SocketID << " points to a dead group!");
- s->m_GroupOf = NULL;
- s->m_GroupMemberData = NULL;
- }
- }
- // Just in case, do it in closed sockets, too, although this should be
- // always done before moving to it.
- for (sockets_t::iterator i = m_ClosedSockets.begin(); i != m_ClosedSockets.end(); ++i)
- {
- CUDTSocket* s = i->second;
- if (s->m_GroupOf == g)
- {
- HLOGC(smlog.Debug, log << "deleteGroup: IPE: closed @" << s->m_SocketID << " points to a dead group!");
- s->m_GroupOf = NULL;
- s->m_GroupMemberData = NULL;
- }
- }
- }
- #endif
- int srt::CUDTUnited::close(CUDTSocket* s)
- {
- HLOGC(smlog.Debug, log << s->core().CONID() << "CLOSE. Acquiring control lock");
- ScopedLock socket_cg(s->m_ControlLock);
- HLOGC(smlog.Debug, log << s->core().CONID() << "CLOSING (removing from listening, closing CUDT)");
- const bool synch_close_snd = s->core().m_config.bSynSending;
- SRTSOCKET u = s->m_SocketID;
- if (s->m_Status == SRTS_LISTENING)
- {
- if (s->core().m_bBroken)
- return 0;
- s->m_tsClosureTimeStamp = steady_clock::now();
- s->core().m_bBroken = true;
- // Change towards original UDT:
- // Leave all the closing activities for garbageCollect to happen,
- // however remove the listener from the RcvQueue IMMEDIATELY.
- // Even though garbageCollect would eventually remove the listener
- // as well, there would be some time interval between now and the
- // moment when it's done, and during this time the application will
- // be unable to bind to this port that the about-to-delete listener
- // is currently occupying (due to blocked slot in the RcvQueue).
- HLOGC(smlog.Debug, log << s->core().CONID() << "CLOSING (removing listener immediately)");
- s->core().notListening();
- s->m_Status = SRTS_CLOSING;
- // broadcast all "accept" waiting
- CSync::lock_notify_all(s->m_AcceptCond, s->m_AcceptLock);
- }
- else
- {
- s->m_Status = SRTS_CLOSING;
- // Note: this call may be done on a socket that hasn't finished
- // sending all packets scheduled for sending, which means, this call
- // may block INDEFINITELY. As long as it's acceptable to block the
- // call to srt_close(), and all functions in all threads where this
- // very socket is used, this shall not block the central database.
- s->core().closeInternal();
- // synchronize with garbage collection.
- HLOGC(smlog.Debug,
- log << "@" << u << "U::close done. GLOBAL CLOSE: " << s->core().CONID()
- << "Acquiring GLOBAL control lock");
- ScopedLock manager_cg(m_GlobControlLock);
- // since "s" is located before m_GlobControlLock, locate it again in case
- // it became invalid
- // XXX This is very weird; if we state that the CUDTSocket object
- // could not be deleted between locks, then definitely it couldn't
- // also change the pointer value. There's no other reason for getting
- // this iterator but to obtain the 's' pointer, which is impossible to
- // be different than previous 's' (m_Sockets is a map that stores pointers
- // transparently). This iterator isn't even later used to delete the socket
- // from the container, though it would be more efficient.
- // FURTHER RESEARCH REQUIRED.
- sockets_t::iterator i = m_Sockets.find(u);
- if ((i == m_Sockets.end()) || (i->second->m_Status == SRTS_CLOSED))
- {
- HLOGC(smlog.Debug, log << "@" << u << "U::close: NOT AN ACTIVE SOCKET, returning.");
- return 0;
- }
- s = i->second;
- s->setClosed();
- #if ENABLE_BONDING
- if (s->m_GroupOf)
- {
- HLOGC(smlog.Debug,
- log << "@" << s->m_SocketID << " IS MEMBER OF $" << s->m_GroupOf->id() << " - REMOVING FROM GROUP");
- s->removeFromGroup(true);
- }
- #endif
- m_Sockets.erase(s->m_SocketID);
- m_ClosedSockets[s->m_SocketID] = s;
- HLOGC(smlog.Debug, log << "@" << u << "U::close: Socket MOVED TO CLOSED for collecting later.");
- CGlobEvent::triggerEvent();
- }
- HLOGC(smlog.Debug, log << "@" << u << ": GLOBAL: CLOSING DONE");
- // Check if the ID is still in closed sockets before you access it
- // (the last triggerEvent could have deleted it).
- if (synch_close_snd)
- {
- #if SRT_ENABLE_CLOSE_SYNCH
- HLOGC(smlog.Debug, log << "@" << u << " GLOBAL CLOSING: sync-waiting for releasing sender resources...");
- for (;;)
- {
- CSndBuffer* sb = s->core().m_pSndBuffer;
- // Disconnected from buffer - nothing more to check.
- if (!sb)
- {
- HLOGC(smlog.Debug,
- log << "@" << u << " GLOBAL CLOSING: sending buffer disconnected. Allowed to close.");
- break;
- }
- // Sender buffer empty
- if (sb->getCurrBufSize() == 0)
- {
- HLOGC(smlog.Debug, log << "@" << u << " GLOBAL CLOSING: sending buffer depleted. Allowed to close.");
- break;
- }
- // Ok, now you are keeping GC thread hands off the internal data.
- // You can check then if it has already deleted the socket or not.
- // The socket is either in m_ClosedSockets or is already gone.
- // Done the other way, but still done. You can stop waiting.
- bool isgone = false;
- {
- ScopedLock manager_cg(m_GlobControlLock);
- isgone = m_ClosedSockets.count(u) == 0;
- }
- if (!isgone)
- {
- isgone = !s->core().m_bOpened;
- }
- if (isgone)
- {
- HLOGC(smlog.Debug,
- log << "@" << u << " GLOBAL CLOSING: ... gone in the meantime, whatever. Exiting close().");
- break;
- }
- HLOGC(smlog.Debug, log << "@" << u << " GLOBAL CLOSING: ... still waiting for any update.");
- // How to handle a possible error here?
- CGlobEvent::waitForEvent();
- // Continue waiting in case when an event happened or 1s waiting time passed for checkpoint.
- }
- #endif
- }
- /*
- This code is PUT ASIDE for now.
- Most likely this will be never required.
- It had to hold the closing activity until the time when the receiver buffer is depleted.
- However the closing of the socket should only happen when the receiver has received
- an information about that the reading is no longer possible (error report from recv/recvfile).
- When this happens, the receiver buffer is definitely depleted already and there's no need to check
- anything.
- Should there appear any other conditions in future under which the closing process should be
- delayed until the receiver buffer is empty, this code can be filled here.
- if ( synch_close_rcv )
- {
- ...
- }
- */
- CSync::notify_one_relaxed(m_GCStopCond);
- return 0;
- }
- void srt::CUDTUnited::getpeername(const SRTSOCKET u, sockaddr* pw_name, int* pw_namelen)
- {
- if (!pw_name || !pw_namelen)
- throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
- if (getStatus(u) != SRTS_CONNECTED)
- throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
- CUDTSocket* s = locateSocket(u);
- if (!s)
- throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
- if (!s->core().m_bConnected || s->core().m_bBroken)
- throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
- const int len = s->m_PeerAddr.size();
- if (*pw_namelen < len)
- throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
- memcpy((pw_name), &s->m_PeerAddr.sa, len);
- *pw_namelen = len;
- }
- void srt::CUDTUnited::getsockname(const SRTSOCKET u, sockaddr* pw_name, int* pw_namelen)
- {
- if (!pw_name || !pw_namelen)
- throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
- CUDTSocket* s = locateSocket(u);
- if (!s)
- throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
- if (s->core().m_bBroken)
- throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
- if (s->m_Status == SRTS_INIT)
- throw CUDTException(MJ_CONNECTION, MN_NOCONN, 0);
- const int len = s->m_SelfAddr.size();
- if (*pw_namelen < len)
- throw CUDTException(MJ_NOTSUP, MN_INVAL, 0);
- memcpy((pw_name), &s->m_SelfAddr.sa, len);
- *pw_namelen = len;
- }
- int srt::CUDTUnited::select(UDT::UDSET* readfds, UDT::UDSET* writefds, UDT::UDSET* exceptfds, const timeval* timeout)
- {
- const steady_clock::time_point entertime = steady_clock::now();
- const int64_t timeo_us = timeout ? static_cast<int64_t>(timeout->tv_sec) * 1000000 + timeout->tv_usec : -1;
- const steady_clock::duration timeo(microseconds_from(timeo_us));
- // initialize results
- int count = 0;
- set<SRTSOCKET> rs, ws, es;
- // retrieve related UDT sockets
- vector<CUDTSocket*> ru, wu, eu;
- CUDTSocket* s;
- if (readfds)
- for (set<SRTSOCKET>::iterator i1 = readfds->begin(); i1 != readfds->end(); ++i1)
- {
- if (getStatus(*i1) == SRTS_BROKEN)
- {
- rs.insert(*i1);
- ++count;
- }
- else if (!(s = locateSocket(*i1)))
- throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
- else
- ru.push_back(s);
- }
- if (writefds)
- for (set<SRTSOCKET>::iterator i2 = writefds->begin(); i2 != writefds->end(); ++i2)
- {
- if (getStatus(*i2) == SRTS_BROKEN)
- {
- ws.insert(*i2);
- ++count;
- }
- else if (!(s = locateSocket(*i2)))
- throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
- else
- wu.push_back(s);
- }
- if (exceptfds)
- for (set<SRTSOCKET>::iterator i3 = exceptfds->begin(); i3 != exceptfds->end(); ++i3)
- {
- if (getStatus(*i3) == SRTS_BROKEN)
- {
- es.insert(*i3);
- ++count;
- }
- else if (!(s = locateSocket(*i3)))
- throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
- else
- eu.push_back(s);
- }
- do
- {
- // query read sockets
- for (vector<CUDTSocket*>::iterator j1 = ru.begin(); j1 != ru.end(); ++j1)
- {
- s = *j1;
- if (s->readReady() || s->m_Status == SRTS_CLOSED)
- {
- rs.insert(s->m_SocketID);
- ++count;
- }
- }
- // query write sockets
- for (vector<CUDTSocket*>::iterator j2 = wu.begin(); j2 != wu.end(); ++j2)
- {
- s = *j2;
- if (s->writeReady() || s->m_Status == SRTS_CLOSED)
- {
- ws.insert(s->m_SocketID);
- ++count;
- }
- }
- // query exceptions on sockets
- for (vector<CUDTSocket*>::iterator j3 = eu.begin(); j3 != eu.end(); ++j3)
- {
- // check connection request status, not supported now
- }
- if (0 < count)
- break;
- CGlobEvent::waitForEvent();
- } while (timeo > steady_clock::now() - entertime);
- if (readfds)
- *readfds = rs;
- if (writefds)
- *writefds = ws;
- if (exceptfds)
- *exceptfds = es;
- return count;
- }
- int srt::CUDTUnited::selectEx(const vector<SRTSOCKET>& fds,
- vector<SRTSOCKET>* readfds,
- vector<SRTSOCKET>* writefds,
- vector<SRTSOCKET>* exceptfds,
- int64_t msTimeOut)
- {
- const steady_clock::time_point entertime = steady_clock::now();
- const int64_t timeo_us = msTimeOut >= 0 ? msTimeOut * 1000 : -1;
- const steady_clock::duration timeo(microseconds_from(timeo_us));
- // initialize results
- int count = 0;
- if (readfds)
- readfds->clear();
- if (writefds)
- writefds->clear();
- if (exceptfds)
- exceptfds->clear();
- do
- {
- for (vector<SRTSOCKET>::const_iterator i = fds.begin(); i != fds.end(); ++i)
- {
- CUDTSocket* s = locateSocket(*i);
- if ((!s) || s->core().m_bBroken || (s->m_Status == SRTS_CLOSED))
- {
- if (exceptfds)
- {
- exceptfds->push_back(*i);
- ++count;
- }
- continue;
- }
- if (readfds)
- {
- if ((s->core().m_bConnected && s->core().m_pRcvBuffer->isRcvDataReady()) ||
- (s->core().m_bListening && (s->m_QueuedSockets.size() > 0)))
- {
- readfds->push_back(s->m_SocketID);
- ++count;
- }
- }
- if (writefds)
- {
- if (s->core().m_bConnected &&
- (s->core().m_pSndBuffer->getCurrBufSize() < s->core().m_config.iSndBufSize))
- {
- writefds->push_back(s->m_SocketID);
- ++count;
- }
- }
- }
- if (count > 0)
- break;
- CGlobEvent::waitForEvent();
- } while (timeo > steady_clock::now() - entertime);
- return count;
- }
- int srt::CUDTUnited::epoll_create()
- {
- return m_EPoll.create();
- }
- int srt::CUDTUnited::epoll_clear_usocks(int eid)
- {
- return m_EPoll.clear_usocks(eid);
- }
- int srt::CUDTUnited::epoll_add_usock(const int eid, const SRTSOCKET u, const int* events)
- {
- int ret = -1;
- #if ENABLE_BONDING
- if (u & SRTGROUP_MASK)
- {
- GroupKeeper k(*this, u, ERH_THROW);
- ret = m_EPoll.update_usock(eid, u, events);
- k.group->addEPoll(eid);
- return 0;
- }
- #endif
- CUDTSocket* s = locateSocket(u);
- if (s)
- {
- ret = epoll_add_usock_INTERNAL(eid, s, events);
- }
- else
- {
- throw CUDTException(MJ_NOTSUP, MN_SIDINVAL);
- }
- return ret;
- }
- // NOTE: WILL LOCK (serially):
- // - CEPoll::m_EPollLock
- // - CUDT::m_RecvLock
- int srt::CUDTUnited::epoll_add_usock_INTERNAL(const int eid, CUDTSocket* s, const int* events)
- {
- int ret = m_EPoll.update_usock(eid, s->m_SocketID, events);
- s->core().addEPoll(eid);
- return ret;
- }
- int srt::CUDTUnited::epoll_add_ssock(const int eid, const SYSSOCKET s, const int* events)
- {
- return m_EPoll.add_ssock(eid, s, events);
- }
- int srt::CUDTUnited::epoll_update_ssock(const int eid, const SYSSOCKET s, const int* events)
- {
- return m_EPoll.update_ssock(eid, s, events);
- }
- template <class EntityType>
- int srt::CUDTUnited::epoll_remove_entity(const int eid, EntityType* ent)
- {
- // XXX Not sure if this is anyhow necessary because setting readiness
- // to false doesn't actually trigger any action. Further research needed.
- HLOGC(ealog.Debug, log << "epoll_remove_usock: CLEARING readiness on E" << eid << " of @" << ent->id());
- ent->removeEPollEvents(eid);
- // First remove the EID from the subscribed in the socket so that
- // a possible call to update_events:
- // - if happens before this call, can find the epoll bit update possible
- // - if happens after this call, will not strike this EID
- HLOGC(ealog.Debug, log << "epoll_remove_usock: REMOVING E" << eid << " from back-subscirbers in @" << ent->id());
- ent->removeEPollID(eid);
- HLOGC(ealog.Debug, log << "epoll_remove_usock: CLEARING subscription on E" << eid << " of @" << ent->id());
- int no_events = 0;
- int ret = m_EPoll.update_usock(eid, ent->id(), &no_events);
- return ret;
- }
- // Needed internal access!
- int srt::CUDTUnited::epoll_remove_socket_INTERNAL(const int eid, CUDTSocket* s)
- {
- return epoll_remove_entity(eid, &s->core());
- }
- #if ENABLE_BONDING
- int srt::CUDTUnited::epoll_remove_group_INTERNAL(const int eid, CUDTGroup* g)
- {
- return epoll_remove_entity(eid, g);
- }
- #endif
- int srt::CUDTUnited::epoll_remove_usock(const int eid, const SRTSOCKET u)
- {
- CUDTSocket* s = 0;
- #if ENABLE_BONDING
- CUDTGroup* g = 0;
- if (u & SRTGROUP_MASK)
- {
- GroupKeeper k(*this, u, ERH_THROW);
- g = k.group;
- return epoll_remove_entity(eid, g);
- }
- else
- #endif
- {
- s = locateSocket(u);
- if (s)
- return epoll_remove_entity(eid, &s->core());
- }
- LOGC(ealog.Error,
- log << "remove_usock: @" << u << " not found as either socket or group. Removing only from epoll system.");
- int no_events = 0;
- return m_EPoll.update_usock(eid, u, &no_events);
- }
- int srt::CUDTUnited::epoll_remove_ssock(const int eid, const SYSSOCKET s)
- {
- return m_EPoll.remove_ssock(eid, s);
- }
- int srt::CUDTUnited::epoll_uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut)
- {
- return m_EPoll.uwait(eid, fdsSet, fdsSize, msTimeOut);
- }
- int32_t srt::CUDTUnited::epoll_set(int eid, int32_t flags)
- {
- return m_EPoll.setflags(eid, flags);
- }
- int srt::CUDTUnited::epoll_release(const int eid)
- {
- return m_EPoll.release(eid);
- }
- srt::CUDTSocket* srt::CUDTUnited::locateSocket(const SRTSOCKET u, ErrorHandling erh)
- {
- ScopedLock cg(m_GlobControlLock);
- CUDTSocket* s = locateSocket_LOCKED(u);
- if (!s)
- {
- if (erh == ERH_RETURN)
- return NULL;
- throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
- }
- return s;
- }
- // [[using locked(m_GlobControlLock)]];
- srt::CUDTSocket* srt::CUDTUnited::locateSocket_LOCKED(SRTSOCKET u)
- {
- sockets_t::iterator i = m_Sockets.find(u);
- if ((i == m_Sockets.end()) || (i->second->m_Status == SRTS_CLOSED))
- {
- return NULL;
- }
- return i->second;
- }
- #if ENABLE_BONDING
- srt::CUDTGroup* srt::CUDTUnited::locateAcquireGroup(SRTSOCKET u, ErrorHandling erh)
- {
- ScopedLock cg(m_GlobControlLock);
- const groups_t::iterator i = m_Groups.find(u);
- if (i == m_Groups.end())
- {
- if (erh == ERH_THROW)
- throw CUDTException(MJ_NOTSUP, MN_SIDINVAL, 0);
- return NULL;
- }
- ScopedLock cgroup(*i->second->exp_groupLock());
- i->second->apiAcquire();
- return i->second;
- }
- srt::CUDTGroup* srt::CUDTUnited::acquireSocketsGroup(CUDTSocket* s)
- {
- ScopedLock cg(m_GlobControlLock);
- CUDTGroup* g = s->m_GroupOf;
- if (!g)
- return NULL;
- // With m_GlobControlLock locked, we are sure the group
- // still exists, if it wasn't removed from this socket.
- g->apiAcquire();
- return g;
- }
- #endif
- srt::CUDTSocket* srt::CUDTUnited::locatePeer(const sockaddr_any& peer, const SRTSOCKET id, int32_t isn)
- {
- ScopedLock cg(m_GlobControlLock);
- map<int64_t, set<SRTSOCKET> >::iterator i = m_PeerRec.find(CUDTSocket::getPeerSpec(id, isn));
- if (i == m_PeerRec.end())
- return NULL;
- for (set<SRTSOCKET>::iterator j = i->second.begin(); j != i->second.end(); ++j)
- {
- sockets_t::iterator k = m_Sockets.find(*j);
- // this socket might have been closed and moved m_ClosedSockets
- if (k == m_Sockets.end())
- continue;
- if (k->second->m_PeerAddr == peer)
- {
- return k->second;
- }
- }
- return NULL;
- }
- void srt::CUDTUnited::checkBrokenSockets()
- {
- ScopedLock cg(m_GlobControlLock);
- #if ENABLE_BONDING
- vector<SRTSOCKET> delgids;
- for (groups_t::iterator i = m_ClosedGroups.begin(); i != m_ClosedGroups.end(); ++i)
- {
- // isStillBusy requires lock on the group, so only after an API
- // function that uses it returns, and so clears the busy flag,
- // a new API function won't be called anyway until it can acquire
- // GlobControlLock, and all functions that have already seen this
- // group as closing will not continue with the API and return.
- // If we caught some API function still using the closed group,
- // it's not going to wait, will be checked next time.
- if (i->second->isStillBusy())
- continue;
- delgids.push_back(i->first);
- delete i->second;
- i->second = NULL; // just for a case, avoid a dangling pointer
- }
- for (vector<SRTSOCKET>::iterator di = delgids.begin(); di != delgids.end(); ++di)
- {
- m_ClosedGroups.erase(*di);
- }
- #endif
- // set of sockets To Be Closed and To Be Removed
- vector<SRTSOCKET> tbc;
- vector<SRTSOCKET> tbr;
- for (sockets_t::iterator i = m_Sockets.begin(); i != m_Sockets.end(); ++i)
- {
- CUDTSocket* s = i->second;
- if (!s->core().m_bBroken)
- continue;
- if (s->m_Status == SRTS_LISTENING)
- {
- const steady_clock::duration elapsed = steady_clock::now() - s->m_tsClosureTimeStamp;
- // A listening socket should wait an extra 3 seconds
- // in case a client is connecting.
- if (elapsed < milliseconds_from(CUDT::COMM_CLOSE_BROKEN_LISTENER_TIMEOUT_MS))
- continue;
- }
- else if ((s->core().m_pRcvBuffer != NULL)
- // FIXED: calling isRcvDataAvailable() just to get the information
- // whether there are any data waiting in the buffer,
- // NOT WHETHER THEY ARE ALSO READY TO PLAY at the time when
- // this function is called (isRcvDataReady also checks if the
- // available data is "ready to play").
- && s->core().m_pRcvBuffer->hasAvailablePackets())
- {
- const int bc = s->core().m_iBrokenCounter.load();
- if (bc > 0)
- {
- // if there is still data in the receiver buffer, wait longer
- s->core().m_iBrokenCounter.store(bc - 1);
- continue;
- }
- }
- #if ENABLE_BONDING
- if (s->m_GroupOf)
- {
- HLOGC(smlog.Debug,
- log << "@" << s->m_SocketID << " IS MEMBER OF $" << s->m_GroupOf->id() << " - REMOVING FROM GROUP");
- s->removeFromGroup(true);
- }
- #endif
- HLOGC(smlog.Debug, log << "checkBrokenSockets: moving BROKEN socket to CLOSED: @" << i->first);
- // close broken connections and start removal timer
- s->setClosed();
- tbc.push_back(i->first);
- m_ClosedSockets[i->first] = s;
- // remove from listener's queue
- sockets_t::iterator ls = m_Sockets.find(s->m_ListenSocket);
- if (ls == m_Sockets.end())
- {
- ls = m_ClosedSockets.find(s->m_ListenSocket);
- if (ls == m_ClosedSockets.end())
- continue;
- }
- enterCS(ls->second->m_AcceptLock);
- ls->second->m_QueuedSockets.erase(s->m_SocketID);
- leaveCS(ls->second->m_AcceptLock);
- }
- for (sockets_t::iterator j = m_ClosedSockets.begin(); j != m_ClosedSockets.end(); ++j)
- {
- // HLOGC(smlog.Debug, log << "checking CLOSED socket: " << j->first);
- if (!is_zero(j->second->core().m_tsLingerExpiration))
- {
- // asynchronous close:
- if ((!j->second->core().m_pSndBuffer) || (0 == j->second->core().m_pSndBuffer->getCurrBufSize()) ||
- (j->second->core().m_tsLingerExpiration <= steady_clock::now()))
- {
- HLOGC(smlog.Debug, log << "checkBrokenSockets: marking CLOSED qualified @" << j->second->m_SocketID);
- j->second->core().m_tsLingerExpiration = steady_clock::time_point();
- j->second->core().m_bClosing = true;
- j->second->m_tsClosureTimeStamp = steady_clock::now();
- }
- }
- // timeout 1 second to destroy a socket AND it has been removed from
- // RcvUList
- const steady_clock::time_point now = steady_clock::now();
- const steady_clock::duration closed_ago = now - j->second->m_tsClosureTimeStamp;
- if (closed_ago > seconds_from(1))
- {
- CRNode* rnode = j->second->core().m_pRNode;
- if (!rnode || !rnode->m_bOnList)
- {
- HLOGC(smlog.Debug,
- log << "checkBrokenSockets: @" << j->second->m_SocketID << " closed "
- << FormatDuration(closed_ago) << " ago and removed from RcvQ - will remove");
- // HLOGC(smlog.Debug, log << "will unref socket: " << j->first);
- tbr.push_back(j->first);
- }
- }
- }
- // move closed sockets to the ClosedSockets structure
- for (vector<SRTSOCKET>::iterator k = tbc.begin(); k != tbc.end(); ++k)
- m_Sockets.erase(*k);
- // remove those timeout sockets
- for (vector<SRTSOCKET>::iterator l = tbr.begin(); l != tbr.end(); ++l)
- removeSocket(*l);
- HLOGC(smlog.Debug, log << "checkBrokenSockets: after removal: m_ClosedSockets.size()=" << m_ClosedSockets.size());
- }
- // [[using locked(m_GlobControlLock)]]
- void srt::CUDTUnited::removeSocket(const SRTSOCKET u)
- {
- sockets_t::iterator i = m_ClosedSockets.find(u);
- // invalid socket ID
- if (i == m_ClosedSockets.end())
- return;
- CUDTSocket* const s = i->second;
- // The socket may be in the trashcan now, but could
- // still be under processing in the sender/receiver worker
- // threads. If that's the case, SKIP IT THIS TIME. The
- // socket will be checked next time the GC rollover starts.
- CSNode* sn = s->core().m_pSNode;
- if (sn && sn->m_iHeapLoc != -1)
- return;
- CRNode* rn = s->core().m_pRNode;
- if (rn && rn->m_bOnList)
- return;
- #if ENABLE_BONDING
- if (s->m_GroupOf)
- {
- HLOGC(smlog.Debug,
- log << "@" << s->m_SocketID << " IS MEMBER OF $" << s->m_GroupOf->id() << " - REMOVING FROM GROUP");
- s->removeFromGroup(true);
- }
- #endif
- // decrease multiplexer reference count, and remove it if necessary
- const int mid = s->m_iMuxID;
- {
- ScopedLock cg(s->m_AcceptLock);
- // if it is a listener, close all un-accepted sockets in its queue
- // and remove them later
- for (set<SRTSOCKET>::iterator q = s->m_QueuedSockets.begin(); q != s->m_QueuedSockets.end(); ++q)
- {
- sockets_t::iterator si = m_Sockets.find(*q);
- if (si == m_Sockets.end())
- {
- // gone in the meantime
- LOGC(smlog.Error,
- log << "removeSocket: IPE? socket @" << (*q) << " being queued for listener socket @"
- << s->m_SocketID << " is GONE in the meantime ???");
- continue;
- }
- CUDTSocket* as = si->second;
- as->breakSocket_LOCKED();
- m_ClosedSockets[*q] = as;
- m_Sockets.erase(*q);
- }
- }
- // remove from peer rec
- map<int64_t, set<SRTSOCKET> >::iterator j = m_PeerRec.find(s->getPeerSpec());
- if (j != m_PeerRec.end())
- {
- j->second.erase(u);
- if (j->second.empty())
- m_PeerRec.erase(j);
- }
- /*
- * Socket may be deleted while still having ePoll events set that would
- * remains forever causing epoll_wait to unblock continuously for inexistent
- * sockets. Get rid of all events for this socket.
- */
- m_EPoll.update_events(u, s->core().m_sPollID, SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR, false);
- // delete this one
- m_ClosedSockets.erase(i);
- HLOGC(smlog.Debug, log << "GC/removeSocket: closing associated UDT @" << u);
- s->core().closeInternal();
- HLOGC(smlog.Debug, log << "GC/removeSocket: DELETING SOCKET @" << u);
- delete s;
- HLOGC(smlog.Debug, log << "GC/removeSocket: socket @" << u << " DELETED. Checking muxer.");
- if (mid == -1)
- {
- HLOGC(smlog.Debug, log << "GC/removeSocket: no muxer found, finishing.");
- return;
- }
- map<int, CMultiplexer>::iterator m;
- m = m_mMultiplexer.find(mid);
- if (m == m_mMultiplexer.end())
- {
- LOGC(smlog.Fatal, log << "IPE: For socket @" << u << " MUXER id=" << mid << " NOT FOUND!");
- return;
- }
- CMultiplexer& mx = m->second;
- mx.m_iRefCount--;
- HLOGC(smlog.Debug, log << "unrefing underlying muxer " << mid << " for @" << u << ", ref=" << mx.m_iRefCount);
- if (0 == mx.m_iRefCount)
- {
- HLOGC(smlog.Debug,
- log << "MUXER id=" << mid << " lost last socket @" << u << " - deleting muxer bound to port "
- << mx.m_pChannel->bindAddressAny().hport());
- // The channel has no access to the queues and
- // it looks like the multiplexer is the master of all of them.
- // The queues must be silenced before closing the channel
- // because this will cause error to be returned in any operation
- // being currently done in the queues, if any.
- mx.m_pSndQueue->setClosing();
- mx.m_pRcvQueue->setClosing();
- mx.destroy();
- m_mMultiplexer.erase(m);
- }
- }
- void srt::CUDTUnited::configureMuxer(CMultiplexer& w_m, const CUDTSocket* s, int af)
- {
- w_m.m_mcfg = s->core().m_config;
- w_m.m_iIPversion = af;
- w_m.m_iRefCount = 1;
- w_m.m_iID = s->m_SocketID;
- }
- uint16_t srt::CUDTUnited::installMuxer(CUDTSocket* w_s, CMultiplexer& fw_sm)
- {
- w_s->core().m_pSndQueue = fw_sm.m_pSndQueue;
- w_s->core().m_pRcvQueue = fw_sm.m_pRcvQueue;
- w_s->m_iMuxID = fw_sm.m_iID;
- sockaddr_any sa;
- fw_sm.m_pChannel->getSockAddr((sa));
- w_s->m_SelfAddr = sa; // Will be also completed later, but here it's needed for later checks
- return sa.hport();
- }
- bool srt::CUDTUnited::inet6SettingsCompat(const sockaddr_any& muxaddr, const CSrtMuxerConfig& cfgMuxer,
- const sockaddr_any& reqaddr, const CSrtMuxerConfig& cfgSocket)
- {
- if (muxaddr.family() != AF_INET6)
- return true; // Don't check - the family has been checked already
- if (reqaddr.isany())
- {
- if (cfgSocket.iIpV6Only == -1) // Treat as "adaptive"
- return true;
- // If set explicitly, then it must be equal to the one of found muxer.
- return cfgSocket.iIpV6Only == cfgMuxer.iIpV6Only;
- }
- // If binding to the certain IPv6 address, then this setting doesn't matter.
- return true;
- }
- bool srt::CUDTUnited::channelSettingsMatch(const CSrtMuxerConfig& cfgMuxer, const CSrtConfig& cfgSocket)
- {
- if (!cfgMuxer.bReuseAddr)
- {
- HLOGP(smlog.Debug, "channelSettingsMatch: fail: the multiplexer is not reusable");
- return false;
- }
- if (cfgMuxer.isCompatWith(cfgSocket))
- return true;
- HLOGP(smlog.Debug, "channelSettingsMatch: fail: some options have different values");
- return false;
- }
- void srt::CUDTUnited::updateMux(CUDTSocket* s, const sockaddr_any& reqaddr, const UDPSOCKET* udpsock /*[[nullable]]*/)
- {
- ScopedLock cg(m_GlobControlLock);
- // If udpsock is provided, then this socket will be simply
- // taken for binding as a good deal. It would be nice to make
- // a sanity check to see if this UDP socket isn't already installed
- // in some multiplexer, but we state this UDP socket isn't accessible
- // anyway so this wouldn't be possible.
- if (!udpsock)
- {
- // If not, we need to see if there exist already a multiplexer bound
- // to the same endpoint.
- const int port = reqaddr.hport();
- const CSrtConfig& cfgSocket = s->core().m_config;
- // This loop is going to check the attempted binding of
- // address:port and socket settings against every existing
- // multiplexer. Possible results of the check are:
- // 1. MATCH: identical address - reuse it and quit.
- // 2. CONFLICT: report error: the binding partially overlaps
- // so it neither can be reused nor is free to bind.
- // 3. PASS: different and not overlapping - continue searching.
- // In this function the convention is:
- // MATCH: do nothing and proceed with binding reusage, THEN break.
- // CONFLICT: throw an exception.
- // PASS: use 'continue' to pass to the next element.
- bool reuse_attempt = false;
- for (map<int, CMultiplexer>::iterator i = m_mMultiplexer.begin(); i != m_mMultiplexer.end(); ++i)
- {
- CMultiplexer& m = i->second;
- // First, we need to find a multiplexer with the same port.
- if (m.m_iPort != port)
- {
- HLOGC(smlog.Debug,
- log << "bind: muxer @" << m.m_iID << " found, but for port " << m.m_iPort
- << " (requested port: " << port << ")");
- continue;
- }
- // If this is bound to the wildcard address, it can be reused if:
- // - reqaddr is also a wildcard
- // - channel settings match
- // Otherwise it's a conflict.
- sockaddr_any mux_addr;
- m.m_pChannel->getSockAddr((mux_addr));
- HLOGC(smlog.Debug,
- log << "bind: Found existing muxer @" << m.m_iID << " : " << mux_addr.str() << " - check against "
- << reqaddr.str());
- if (mux_addr.isany())
- {
- if (mux_addr.family() == AF_INET6)
- {
- // With IPv6 we need to research two possibilities:
- // iIpV6Only == 1 -> This means that it binds only :: wildcard, but not 0.0.0.0
- // iIpV6Only == 0 -> This means that it binds both :: and 0.0.0.0.
- // iIpV6Only == -1 -> Hard to say what to do, but treat it as a potential conflict in any doubtful case.
- if (m.m_mcfg.iIpV6Only == 1)
- {
- // PASS IF: candidate is IPv4, no matter the address
- // MATCH IF: candidate is IPv6 with only=1
- // CONFLICT IF: candidate is IPv6 with only != 1 or IPv6 non-wildcard.
- if (reqaddr.family() == AF_INET)
- {
- HLOGC(smlog.Debug, log << "bind: muxer @" << m.m_iID
- << " is :: v6only - requested IPv4 ANY is NOT IN THE WAY. Searching on.");
- continue;
- }
- // Candidate is AF_INET6
- if (cfgSocket.iIpV6Only != 1 || !reqaddr.isany())
- {
- // CONFLICT:
- // 1. attempting to make a wildcard IPv4 + IPv6
- // while the multiplexer for wildcard IPv6 exists.
- // 2. If binding to a given address, it conflicts with the wildcard
- LOGC(smlog.Error,
- log << "bind: Address: " << reqaddr.str()
- << " conflicts with existing IPv6 wildcard binding: " << mux_addr.str());
- throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
- }
- // Otherwise, MATCH.
- }
- else if (m.m_mcfg.iIpV6Only == 0)
- {
- // Muxer's address is a wildcard for :: and 0.0.0.0 at once.
- // This way only IPv6 wildcard with v6only=0 is a perfect match and everything
- // else is a conflict.
- if (reqaddr.family() == AF_INET6 && reqaddr.isany() && cfgSocket.iIpV6Only == 0)
- {
- // MATCH
- }
- else
- {
- // CONFLICT: attempting to make a wildcard IPv4 + IPv6 while
- // the multiplexer for wildcard IPv6 exists.
- LOGC(smlog.Error,
- log << "bind: Address: " << reqaddr.str() << " v6only=" << cfgSocket.iIpV6Only
- << " conflicts with existing IPv6 + IPv4 wildcard binding: " << mux_addr.str());
- throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
- }
- }
- else // Case -1, by unknown reason. Accept only with -1 setting, others are conflict.
- {
- if (reqaddr.family() == AF_INET6 && reqaddr.isany() && cfgSocket.iIpV6Only == -1)
- {
- // MATCH
- }
- else
- {
- LOGC(smlog.Error,
- log << "bind: Address: " << reqaddr.str() << " v6only=" << cfgSocket.iIpV6Only
- << " conflicts with existing IPv6 v6only=unknown wildcard binding: " << mux_addr.str());
- throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
- }
- }
- }
- else // muxer is IPv4 wildcard
- {
- // Then only IPv4 wildcard is a match and:
- // - IPv6 with only=true is PASS (not a conflict)
- // - IPv6 with only=false is CONFLICT
- // - IPv6 with only=undefined is CONFLICT
- // REASON: we need to make a potential conflict a conflict as there will be
- // no bind() call to check if this wouldn't be a conflict in result. If you want
- // to have a binding to IPv6 that should avoid conflict with IPv4 wildcard binding,
- // then SRTO_IPV6ONLY option must be explicitly set before binding.
- // Also:
- if (reqaddr.family() == AF_INET)
- {
- if (reqaddr.isany())
- {
- // MATCH
- }
- else
- {
- LOGC(smlog.Error,
- log << "bind: Address: " << reqaddr.str()
- << " conflicts with existing IPv4 wildcard binding: " << mux_addr.str());
- throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
- }
- }
- else // AF_INET6
- {
- if (cfgSocket.iIpV6Only == 1 || !reqaddr.isany())
- {
- // PASS
- HLOGC(smlog.Debug, log << "bind: muxer @" << m.m_iID
- << " is IPv4 wildcard - requested " << reqaddr.str() << " v6only=" << cfgSocket.iIpV6Only
- << " is NOT IN THE WAY. Searching on.");
- continue;
- }
- else
- {
- LOGC(smlog.Error,
- log << "bind: Address: " << reqaddr.str() << " v6only=" << cfgSocket.iIpV6Only
- << " conflicts with existing IPv4 wildcard binding: " << mux_addr.str());
- throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
- }
- }
- }
- reuse_attempt = true;
- HLOGC(smlog.Debug, log << "bind: wildcard address - multiplexer reusable");
- }
- // Muxer address is NOT a wildcard, so conflicts only with WILDCARD of the same type
- else if (reqaddr.isany() && reqaddr.family() == mux_addr.family())
- {
- LOGC(smlog.Error,
- log << "bind: Wildcard address: " << reqaddr.str()
- << " conflicts with existting IP binding: " << mux_addr.str());
- throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
- }
- // If this is bound to a certain address, AND:
- else if (mux_addr.equal_address(reqaddr))
- {
- // - the address is the same as reqaddr
- reuse_attempt = true;
- HLOGC(smlog.Debug, log << "bind: same IP address - multiplexer reusable");
- }
- else
- {
- HLOGC(smlog.Debug, log << "bind: IP addresses differ - ALLOWED to create a new multiplexer");
- continue;
- }
- // Otherwise:
- // - the address is different than reqaddr
- // - the address can't be reused, but this can go on with new one.
- // If this is a reusage attempt:
- if (reuse_attempt)
- {
- // - if the channel settings match, it can be reused
- if (channelSettingsMatch(m.m_mcfg, cfgSocket) && inet6SettingsCompat(mux_addr, m.m_mcfg, reqaddr, cfgSocket))
- {
- HLOGC(smlog.Debug, log << "bind: reusing multiplexer for port " << port);
- // reuse the existing multiplexer
- ++i->second.m_iRefCount;
- installMuxer((s), (i->second));
- return;
- }
- else
- {
- // - if not, it's a conflict
- LOGC(smlog.Error,
- log << "bind: Address: " << reqaddr.str() << " conflicts with binding: " << mux_addr.str()
- << " due to channel settings");
- throw CUDTException(MJ_NOTSUP, MN_BUSYPORT, 0);
- }
- }
- // If not, proceed to the next one, and when there are no reusage
- // candidates, proceed with creating a new multiplexer.
- // Note that a binding to a different IP address is not treated
- // as a candidate for either reusage or conflict.
- LOGC(smlog.Fatal, log << "SHOULD NOT GET HERE!!!");
- SRT_ASSERT(false);
- }
- }
- // a new multiplexer is needed
- CMultiplexer m;
- configureMuxer((m), s, reqaddr.family());
- try
- {
- m.m_pChannel = new CChannel();
- m.m_pChannel->setConfig(m.m_mcfg);
- if (udpsock)
- {
- // In this case, reqaddr contains the address
- // that has been extracted already from the
- // given socket
- m.m_pChannel->attach(*udpsock, reqaddr);
- }
- else if (reqaddr.empty())
- {
- // The case of previously used case of a NULL address.
- // This here is used to pass family only, in this case
- // just automatically bind to the "0" address to autoselect
- // everything.
- m.m_pChannel->open(reqaddr.family());
- }
- else
- {
- // If at least the IP address is specified, then bind to that
- // address, but still possibly autoselect the outgoing port, if the
- // port was specified as 0.
- m.m_pChannel->open(reqaddr);
- }
- // AFTER OPENING, check the matter of IPV6_V6ONLY option,
- // as it decides about the fact that the occupied binding address
- // in case of wildcard is both :: and 0.0.0.0, or only ::.
- if (reqaddr.family() == AF_INET6 && m.m_mcfg.iIpV6Only == -1)
- {
- // XXX We don't know how probable it is to get the error here
- // and resulting -1 value. As a fallback for that case, the value -1
- // is honored here, just all side-bindings for other sockes will be
- // rejected as a potential conflict, even if binding would be accepted
- // in these circumstances. Only a perfect match in case of potential
- // overlapping will be accepted on the same port.
- m.m_mcfg.iIpV6Only = m.m_pChannel->sockopt(IPPROTO_IPV6, IPV6_V6ONLY, -1);
- }
- m.m_pTimer = new CTimer;
- m.m_pSndQueue = new CSndQueue;
- m.m_pSndQueue->init(m.m_pChannel, m.m_pTimer);
- m.m_pRcvQueue = new CRcvQueue;
- m.m_pRcvQueue->init(128, s->core().maxPayloadSize(), m.m_iIPversion, 1024, m.m_pChannel, m.m_pTimer);
- // Rewrite the port here, as it might be only known upon return
- // from CChannel::open.
- m.m_iPort = installMuxer((s), m);
- m_mMultiplexer[m.m_iID] = m;
- }
- catch (const CUDTException&)
- {
- m.destroy();
- throw;
- }
- catch (...)
- {
- m.destroy();
- throw CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0);
- }
- HLOGC(smlog.Debug, log << "bind: creating new multiplexer for port " << m.m_iPort);
- }
- // This function is going to find a multiplexer for the port contained
- // in the 'ls' listening socket. The multiplexer must exist when the listener
- // exists, otherwise the dispatching procedure wouldn't even call this
- // function. By historical reasons there's also a fallback for a case when the
- // multiplexer wasn't found by id, the search by port number continues.
- bool srt::CUDTUnited::updateListenerMux(CUDTSocket* s, const CUDTSocket* ls)
- {
- ScopedLock cg(m_GlobControlLock);
- const int port = ls->m_SelfAddr.hport();
- HLOGC(smlog.Debug,
- log << "updateListenerMux: finding muxer of listener socket @" << ls->m_SocketID << " muxid=" << ls->m_iMuxID
- << " bound=" << ls->m_SelfAddr.str() << " FOR @" << s->m_SocketID << " addr=" << s->m_SelfAddr.str()
- << "_->_" << s->m_PeerAddr.str());
- // First thing that should be certain here is that there should exist
- // a muxer with the ID written in the listener socket's mux ID.
- CMultiplexer* mux = map_getp(m_mMultiplexer, ls->m_iMuxID);
- // NOTE:
- // THIS BELOW CODE is only for a highly unlikely situation when the listener
- // socket has been closed in the meantime when the accepted socket is being
- // processed. This procedure is different than updateMux because this time we
- // only want to have a multiplexer socket to be assigned to the accepted socket.
- // It is also unlikely that the listener socket is garbage-collected so fast, so
- // this procedure will most likely find the multiplexer of the zombie listener socket,
- // which no longer accepts new connections (the listener is withdrawn immediately from
- // the port) that wasn't yet completely deleted.
- CMultiplexer* fallback = NULL;
- if (!mux)
- {
- LOGC(smlog.Error, log << "updateListenerMux: IPE? listener muxer not found by ID, trying by port");
- // To be used as first found with different IP version
- // find the listener's address
- for (map<int, CMultiplexer>::iterator i = m_mMultiplexer.begin(); i != m_mMultiplexer.end(); ++i)
- {
- CMultiplexer& m = i->second;
- #if ENABLE_HEAVY_LOGGING
- ostringstream that_muxer;
- that_muxer << "id=" << m.m_iID << " port=" << m.m_iPort
- << " ip=" << (m.m_iIPversion == AF_INET ? "v4" : "v6");
- #endif
- if (m.m_iPort == port)
- {
- HLOGC(smlog.Debug, log << "updateListenerMux: reusing muxer: " << that_muxer.str());
- if (m.m_iIPversion == s->m_PeerAddr.family())
- {
- mux = &m; // best match
- break;
- }
- else if (m.m_iIPversion == AF_INET6)
- {
- // Allowed fallback case when we only need an accepted socket.
- fallback = &m;
- }
- }
- else
- {
- HLOGC(smlog.Debug, log << "updateListenerMux: SKIPPING muxer: " << that_muxer.str());
- }
- }
- if (!mux && fallback)
- {
- // It is allowed to reuse this multiplexer, but the socket must allow both IPv4 and IPv6
- if (fallback->m_mcfg.iIpV6Only == 0)
- {
- HLOGC(smlog.Warn, log << "updateListenerMux: reusing multiplexer from different family");
- mux = fallback;
- }
- }
- }
- // Checking again because the above procedure could have set it
- if (mux)
- {
- // reuse the existing multiplexer
- ++mux->m_iRefCount;
- s->core().m_pSndQueue = mux->m_pSndQueue;
- s->core().m_pRcvQueue = mux->m_pRcvQueue;
- s->m_iMuxID = mux->m_iID;
- return true;
- }
- return false;
- }
- void* srt::CUDTUnited::garbageCollect(void* p)
- {
- CUDTUnited* self = (CUDTUnited*)p;
- THREAD_STATE_INIT("SRT:GC");
- UniqueLock gclock(self->m_GCStopLock);
- while (!self->m_bClosing)
- {
- INCREMENT_THREAD_ITERATIONS();
- self->checkBrokenSockets();
- HLOGC(inlog.Debug, log << "GC: sleep 1 s");
- self->m_GCStopCond.wait_for(gclock, seconds_from(1));
- }
- // remove all sockets and multiplexers
- HLOGC(inlog.Debug, log << "GC: GLOBAL EXIT - releasing all pending sockets. Acquring control lock...");
- {
- ScopedLock glock(self->m_GlobControlLock);
- for (sockets_t::iterator i = self->m_Sockets.begin(); i != self->m_Sockets.end(); ++i)
- {
- CUDTSocket* s = i->second;
- s->breakSocket_LOCKED();
- #if ENABLE_BONDING
- if (s->m_GroupOf)
- {
- HLOGC(smlog.Debug,
- log << "@" << s->m_SocketID << " IS MEMBER OF $" << s->m_GroupOf->id()
- << " (IPE?) - REMOVING FROM GROUP");
- s->removeFromGroup(false);
- }
- #endif
- self->m_ClosedSockets[i->first] = s;
- // remove from listener's queue
- sockets_t::iterator ls = self->m_Sockets.find(s->m_ListenSocket);
- if (ls == self->m_Sockets.end())
- {
- ls = self->m_ClosedSockets.find(s->m_ListenSocket);
- if (ls == self->m_ClosedSockets.end())
- continue;
- }
- enterCS(ls->second->m_AcceptLock);
- ls->second->m_QueuedSockets.erase(s->m_SocketID);
- leaveCS(ls->second->m_AcceptLock);
- }
- self->m_Sockets.clear();
- for (sockets_t::iterator j = self->m_ClosedSockets.begin(); j != self->m_ClosedSockets.end(); ++j)
- {
- j->second->m_tsClosureTimeStamp = steady_clock::time_point();
- }
- }
- HLOGC(inlog.Debug, log << "GC: GLOBAL EXIT - releasing all CLOSED sockets.");
- while (true)
- {
- self->checkBrokenSockets();
- enterCS(self->m_GlobControlLock);
- bool empty = self->m_ClosedSockets.empty();
- leaveCS(self->m_GlobControlLock);
- if (empty)
- break;
- HLOGC(inlog.Debug, log << "GC: checkBrokenSockets didn't wipe all sockets, repeating after 1s sleep");
- srt::sync::this_thread::sleep_for(milliseconds_from(1));
- }
- THREAD_EXIT();
- return NULL;
- }
- ////////////////////////////////////////////////////////////////////////////////
- int srt::CUDT::startup()
- {
- return uglobal().startup();
- }
- int srt::CUDT::cleanup()
- {
- return uglobal().cleanup();
- }
- SRTSOCKET srt::CUDT::socket()
- {
- if (!uglobal().m_bGCStatus)
- uglobal().startup();
- try
- {
- return uglobal().newSocket();
- }
- catch (const CUDTException& e)
- {
- SetThreadLocalError(e);
- return INVALID_SOCK;
- }
- catch (const bad_alloc&)
- {
- SetThreadLocalError(CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0));
- return INVALID_SOCK;
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "socket: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
- return INVALID_SOCK;
- }
- }
- srt::CUDT::APIError::APIError(const CUDTException& e)
- {
- SetThreadLocalError(e);
- }
- srt::CUDT::APIError::APIError(CodeMajor mj, CodeMinor mn, int syserr)
- {
- SetThreadLocalError(CUDTException(mj, mn, syserr));
- }
- #if ENABLE_BONDING
- // This is an internal function; 'type' should be pre-checked if it has a correct value.
- // This doesn't have argument of GroupType due to header file conflicts.
- // [[using locked(s_UDTUnited.m_GlobControlLock)]]
- srt::CUDTGroup& srt::CUDT::newGroup(const int type)
- {
- const SRTSOCKET id = uglobal().generateSocketID(true);
- // Now map the group
- return uglobal().addGroup(id, SRT_GROUP_TYPE(type)).set_id(id);
- }
- SRTSOCKET srt::CUDT::createGroup(SRT_GROUP_TYPE gt)
- {
- // Doing the same lazy-startup as with srt_create_socket()
- if (!uglobal().m_bGCStatus)
- uglobal().startup();
- try
- {
- srt::sync::ScopedLock globlock(uglobal().m_GlobControlLock);
- return newGroup(gt).id();
- // Note: potentially, after this function exits, the group
- // could be deleted, immediately, from a separate thread (tho
- // unlikely because the other thread would need some handle to
- // keep it). But then, the first call to any API function would
- // return invalid ID error.
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (...)
- {
- return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
- }
- return SRT_INVALID_SOCK;
- }
- // [[using locked(m_ControlLock)]]
- // [[using locked(CUDT::s_UDTUnited.m_GlobControlLock)]]
- void srt::CUDTSocket::removeFromGroup(bool broken)
- {
- CUDTGroup* g = m_GroupOf;
- if (g)
- {
- // Reset group-related fields immediately. They won't be accessed
- // in the below calls, while the iterator will be invalidated for
- // a short moment between removal from the group container and the end,
- // while the GroupLock would be already taken out. It is safer to reset
- // it to a NULL iterator before removal.
- m_GroupOf = NULL;
- m_GroupMemberData = NULL;
- bool still_have = g->remove(m_SocketID);
- if (broken)
- {
- // Activate the SRT_EPOLL_UPDATE event on the group
- // if it was because of a socket that was earlier connected
- // and became broken. This is not to be sent in case when
- // it is a failure during connection, or the socket was
- // explicitly removed from the group.
- g->activateUpdateEvent(still_have);
- }
- HLOGC(smlog.Debug,
- log << "removeFromGroup: socket @" << m_SocketID << " NO LONGER A MEMBER of $" << g->id() << "; group is "
- << (still_have ? "still ACTIVE" : "now EMPTY"));
- }
- }
- SRTSOCKET srt::CUDT::getGroupOfSocket(SRTSOCKET socket)
- {
- // Lock this for the whole function as we need the group
- // to persist the call.
- ScopedLock glock(uglobal().m_GlobControlLock);
- CUDTSocket* s = uglobal().locateSocket_LOCKED(socket);
- if (!s || !s->m_GroupOf)
- return APIError(MJ_NOTSUP, MN_INVAL, 0);
- return s->m_GroupOf->id();
- }
- int srt::CUDT::getGroupData(SRTSOCKET groupid, SRT_SOCKGROUPDATA* pdata, size_t* psize)
- {
- if ((groupid & SRTGROUP_MASK) == 0 || !psize)
- {
- return APIError(MJ_NOTSUP, MN_INVAL, 0);
- }
- CUDTUnited::GroupKeeper k(uglobal(), groupid, CUDTUnited::ERH_RETURN);
- if (!k.group)
- {
- return APIError(MJ_NOTSUP, MN_INVAL, 0);
- }
- // To get only the size of the group pdata=NULL can be used
- return k.group->getGroupData(pdata, psize);
- }
- #endif
- int srt::CUDT::bind(SRTSOCKET u, const sockaddr* name, int namelen)
- {
- try
- {
- sockaddr_any sa(name, namelen);
- if (sa.len == 0)
- {
- // This happens if the namelen check proved it to be
- // too small for particular family, or that family is
- // not recognized (is none of AF_INET, AF_INET6).
- // This is a user error.
- return APIError(MJ_NOTSUP, MN_INVAL, 0);
- }
- CUDTSocket* s = uglobal().locateSocket(u);
- if (!s)
- return APIError(MJ_NOTSUP, MN_INVAL, 0);
- return uglobal().bind(s, sa);
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (bad_alloc&)
- {
- return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "bind: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN, MN_NONE, 0);
- }
- }
- int srt::CUDT::bind(SRTSOCKET u, UDPSOCKET udpsock)
- {
- try
- {
- CUDTSocket* s = uglobal().locateSocket(u);
- if (!s)
- return APIError(MJ_NOTSUP, MN_INVAL, 0);
- return uglobal().bind(s, udpsock);
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (bad_alloc&)
- {
- return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "bind/udp: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN, MN_NONE, 0);
- }
- }
- int srt::CUDT::listen(SRTSOCKET u, int backlog)
- {
- try
- {
- return uglobal().listen(u, backlog);
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (bad_alloc&)
- {
- return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "listen: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN, MN_NONE, 0);
- }
- }
- SRTSOCKET srt::CUDT::accept_bond(const SRTSOCKET listeners[], int lsize, int64_t msTimeOut)
- {
- try
- {
- return uglobal().accept_bond(listeners, lsize, msTimeOut);
- }
- catch (const CUDTException& e)
- {
- SetThreadLocalError(e);
- return INVALID_SOCK;
- }
- catch (bad_alloc&)
- {
- SetThreadLocalError(CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0));
- return INVALID_SOCK;
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "accept_bond: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
- return INVALID_SOCK;
- }
- }
- SRTSOCKET srt::CUDT::accept(SRTSOCKET u, sockaddr* addr, int* addrlen)
- {
- try
- {
- return uglobal().accept(u, addr, addrlen);
- }
- catch (const CUDTException& e)
- {
- SetThreadLocalError(e);
- return INVALID_SOCK;
- }
- catch (const bad_alloc&)
- {
- SetThreadLocalError(CUDTException(MJ_SYSTEMRES, MN_MEMORY, 0));
- return INVALID_SOCK;
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "accept: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
- return INVALID_SOCK;
- }
- }
- int srt::CUDT::connect(SRTSOCKET u, const sockaddr* name, const sockaddr* tname, int namelen)
- {
- try
- {
- return uglobal().connect(u, name, tname, namelen);
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (bad_alloc&)
- {
- return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
- }
- catch (std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "connect: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN, MN_NONE, 0);
- }
- }
- #if ENABLE_BONDING
- int srt::CUDT::connectLinks(SRTSOCKET grp, SRT_SOCKGROUPCONFIG targets[], int arraysize)
- {
- if (arraysize <= 0)
- return APIError(MJ_NOTSUP, MN_INVAL, 0);
- if ((grp & SRTGROUP_MASK) == 0)
- {
- // connectLinks accepts only GROUP id, not socket id.
- return APIError(MJ_NOTSUP, MN_SIDINVAL, 0);
- }
- try
- {
- CUDTUnited::GroupKeeper k(uglobal(), grp, CUDTUnited::ERH_THROW);
- return uglobal().groupConnect(k.group, targets, arraysize);
- }
- catch (CUDTException& e)
- {
- return APIError(e);
- }
- catch (bad_alloc&)
- {
- return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
- }
- catch (std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "connect: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN, MN_NONE, 0);
- }
- }
- #endif
- int srt::CUDT::connect(SRTSOCKET u, const sockaddr* name, int namelen, int32_t forced_isn)
- {
- try
- {
- return uglobal().connect(u, name, namelen, forced_isn);
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (bad_alloc&)
- {
- return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "connect: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN, MN_NONE, 0);
- }
- }
- int srt::CUDT::close(SRTSOCKET u)
- {
- try
- {
- return uglobal().close(u);
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "close: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN, MN_NONE, 0);
- }
- }
- int srt::CUDT::getpeername(SRTSOCKET u, sockaddr* name, int* namelen)
- {
- try
- {
- uglobal().getpeername(u, name, namelen);
- return 0;
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "getpeername: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN, MN_NONE, 0);
- }
- }
- int srt::CUDT::getsockname(SRTSOCKET u, sockaddr* name, int* namelen)
- {
- try
- {
- uglobal().getsockname(u, name, namelen);
- return 0;
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "getsockname: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN, MN_NONE, 0);
- }
- }
- int srt::CUDT::getsockopt(SRTSOCKET u, int, SRT_SOCKOPT optname, void* pw_optval, int* pw_optlen)
- {
- if (!pw_optval || !pw_optlen)
- {
- return APIError(MJ_NOTSUP, MN_INVAL, 0);
- }
- try
- {
- #if ENABLE_BONDING
- if (u & SRTGROUP_MASK)
- {
- CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW);
- k.group->getOpt(optname, (pw_optval), (*pw_optlen));
- return 0;
- }
- #endif
- CUDT& udt = uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core();
- udt.getOpt(optname, (pw_optval), (*pw_optlen));
- return 0;
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "getsockopt: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN, MN_NONE, 0);
- }
- }
- int srt::CUDT::setsockopt(SRTSOCKET u, int, SRT_SOCKOPT optname, const void* optval, int optlen)
- {
- if (!optval)
- return APIError(MJ_NOTSUP, MN_INVAL, 0);
- try
- {
- #if ENABLE_BONDING
- if (u & SRTGROUP_MASK)
- {
- CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW);
- k.group->setOpt(optname, optval, optlen);
- return 0;
- }
- #endif
- CUDT& udt = uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core();
- udt.setOpt(optname, optval, optlen);
- return 0;
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "setsockopt: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN, MN_NONE, 0);
- }
- }
- int srt::CUDT::send(SRTSOCKET u, const char* buf, int len, int)
- {
- SRT_MSGCTRL mctrl = srt_msgctrl_default;
- return sendmsg2(u, buf, len, (mctrl));
- }
- // --> CUDT::recv moved down
- int srt::CUDT::sendmsg(SRTSOCKET u, const char* buf, int len, int ttl, bool inorder, int64_t srctime)
- {
- SRT_MSGCTRL mctrl = srt_msgctrl_default;
- mctrl.msgttl = ttl;
- mctrl.inorder = inorder;
- mctrl.srctime = srctime;
- return sendmsg2(u, buf, len, (mctrl));
- }
- int srt::CUDT::sendmsg2(SRTSOCKET u, const char* buf, int len, SRT_MSGCTRL& w_m)
- {
- try
- {
- #if ENABLE_BONDING
- if (u & SRTGROUP_MASK)
- {
- CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW);
- return k.group->send(buf, len, (w_m));
- }
- #endif
- return uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core().sendmsg2(buf, len, (w_m));
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (bad_alloc&)
- {
- return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "sendmsg: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN, MN_NONE, 0);
- }
- }
- int srt::CUDT::recv(SRTSOCKET u, char* buf, int len, int)
- {
- SRT_MSGCTRL mctrl = srt_msgctrl_default;
- int ret = recvmsg2(u, buf, len, (mctrl));
- return ret;
- }
- int srt::CUDT::recvmsg(SRTSOCKET u, char* buf, int len, int64_t& srctime)
- {
- SRT_MSGCTRL mctrl = srt_msgctrl_default;
- int ret = recvmsg2(u, buf, len, (mctrl));
- srctime = mctrl.srctime;
- return ret;
- }
- int srt::CUDT::recvmsg2(SRTSOCKET u, char* buf, int len, SRT_MSGCTRL& w_m)
- {
- try
- {
- #if ENABLE_BONDING
- if (u & SRTGROUP_MASK)
- {
- CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW);
- return k.group->recv(buf, len, (w_m));
- }
- #endif
- return uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core().recvmsg2(buf, len, (w_m));
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "recvmsg: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN, MN_NONE, 0);
- }
- }
- int64_t srt::CUDT::sendfile(SRTSOCKET u, fstream& ifs, int64_t& offset, int64_t size, int block)
- {
- try
- {
- CUDT& udt = uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core();
- return udt.sendfile(ifs, offset, size, block);
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (bad_alloc&)
- {
- return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "sendfile: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN, MN_NONE, 0);
- }
- }
- int64_t srt::CUDT::recvfile(SRTSOCKET u, fstream& ofs, int64_t& offset, int64_t size, int block)
- {
- try
- {
- return uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core().recvfile(ofs, offset, size, block);
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "recvfile: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN, MN_NONE, 0);
- }
- }
- int srt::CUDT::select(int, UDT::UDSET* readfds, UDT::UDSET* writefds, UDT::UDSET* exceptfds, const timeval* timeout)
- {
- if ((!readfds) && (!writefds) && (!exceptfds))
- {
- return APIError(MJ_NOTSUP, MN_INVAL, 0);
- }
- try
- {
- return uglobal().select(readfds, writefds, exceptfds, timeout);
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (bad_alloc&)
- {
- return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "select: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN, MN_NONE, 0);
- }
- }
- int srt::CUDT::selectEx(const vector<SRTSOCKET>& fds,
- vector<SRTSOCKET>* readfds,
- vector<SRTSOCKET>* writefds,
- vector<SRTSOCKET>* exceptfds,
- int64_t msTimeOut)
- {
- if ((!readfds) && (!writefds) && (!exceptfds))
- {
- return APIError(MJ_NOTSUP, MN_INVAL, 0);
- }
- try
- {
- return uglobal().selectEx(fds, readfds, writefds, exceptfds, msTimeOut);
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (bad_alloc&)
- {
- return APIError(MJ_SYSTEMRES, MN_MEMORY, 0);
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "selectEx: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN);
- }
- }
- int srt::CUDT::epoll_create()
- {
- try
- {
- return uglobal().epoll_create();
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "epoll_create: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN, MN_NONE, 0);
- }
- }
- int srt::CUDT::epoll_clear_usocks(int eid)
- {
- try
- {
- return uglobal().epoll_clear_usocks(eid);
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (std::exception& ee)
- {
- LOGC(aclog.Fatal,
- log << "epoll_clear_usocks: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN, MN_NONE, 0);
- }
- }
- int srt::CUDT::epoll_add_usock(const int eid, const SRTSOCKET u, const int* events)
- {
- try
- {
- return uglobal().epoll_add_usock(eid, u, events);
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "epoll_add_usock: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN, MN_NONE, 0);
- }
- }
- int srt::CUDT::epoll_add_ssock(const int eid, const SYSSOCKET s, const int* events)
- {
- try
- {
- return uglobal().epoll_add_ssock(eid, s, events);
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "epoll_add_ssock: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN, MN_NONE, 0);
- }
- }
- int srt::CUDT::epoll_update_usock(const int eid, const SRTSOCKET u, const int* events)
- {
- try
- {
- return uglobal().epoll_add_usock(eid, u, events);
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal,
- log << "epoll_update_usock: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN, MN_NONE, 0);
- }
- }
- int srt::CUDT::epoll_update_ssock(const int eid, const SYSSOCKET s, const int* events)
- {
- try
- {
- return uglobal().epoll_update_ssock(eid, s, events);
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal,
- log << "epoll_update_ssock: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN, MN_NONE, 0);
- }
- }
- int srt::CUDT::epoll_remove_usock(const int eid, const SRTSOCKET u)
- {
- try
- {
- return uglobal().epoll_remove_usock(eid, u);
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal,
- log << "epoll_remove_usock: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN, MN_NONE, 0);
- }
- }
- int srt::CUDT::epoll_remove_ssock(const int eid, const SYSSOCKET s)
- {
- try
- {
- return uglobal().epoll_remove_ssock(eid, s);
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal,
- log << "epoll_remove_ssock: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN, MN_NONE, 0);
- }
- }
- int srt::CUDT::epoll_wait(const int eid,
- set<SRTSOCKET>* readfds,
- set<SRTSOCKET>* writefds,
- int64_t msTimeOut,
- set<SYSSOCKET>* lrfds,
- set<SYSSOCKET>* lwfds)
- {
- try
- {
- return uglobal().epoll_ref().wait(eid, readfds, writefds, msTimeOut, lrfds, lwfds);
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "epoll_wait: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN, MN_NONE, 0);
- }
- }
- int srt::CUDT::epoll_uwait(const int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut)
- {
- try
- {
- return uglobal().epoll_uwait(eid, fdsSet, fdsSize, msTimeOut);
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "epoll_uwait: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN, MN_NONE, 0);
- }
- }
- int32_t srt::CUDT::epoll_set(const int eid, int32_t flags)
- {
- try
- {
- return uglobal().epoll_set(eid, flags);
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "epoll_set: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN, MN_NONE, 0);
- }
- }
- int srt::CUDT::epoll_release(const int eid)
- {
- try
- {
- return uglobal().epoll_release(eid);
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "epoll_release: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN, MN_NONE, 0);
- }
- }
- srt::CUDTException& srt::CUDT::getlasterror()
- {
- return GetThreadLocalError();
- }
- int srt::CUDT::bstats(SRTSOCKET u, CBytePerfMon* perf, bool clear, bool instantaneous)
- {
- #if ENABLE_BONDING
- if (u & SRTGROUP_MASK)
- return groupsockbstats(u, perf, clear);
- #endif
- try
- {
- CUDT& udt = uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core();
- udt.bstats(perf, clear, instantaneous);
- return 0;
- }
- catch (const CUDTException& e)
- {
- return APIError(e);
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "bstats: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- return APIError(MJ_UNKNOWN, MN_NONE, 0);
- }
- }
- #if ENABLE_BONDING
- int srt::CUDT::groupsockbstats(SRTSOCKET u, CBytePerfMon* perf, bool clear)
- {
- try
- {
- CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW);
- k.group->bstatsSocket(perf, clear);
- return 0;
- }
- catch (const CUDTException& e)
- {
- SetThreadLocalError(e);
- return ERROR;
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "bstats: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
- return ERROR;
- }
- }
- #endif
- srt::CUDT* srt::CUDT::getUDTHandle(SRTSOCKET u)
- {
- try
- {
- return &uglobal().locateSocket(u, CUDTUnited::ERH_THROW)->core();
- }
- catch (const CUDTException& e)
- {
- SetThreadLocalError(e);
- return NULL;
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "getUDTHandle: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
- return NULL;
- }
- }
- vector<SRTSOCKET> srt::CUDT::existingSockets()
- {
- vector<SRTSOCKET> out;
- for (CUDTUnited::sockets_t::iterator i = uglobal().m_Sockets.begin(); i != uglobal().m_Sockets.end(); ++i)
- {
- out.push_back(i->first);
- }
- return out;
- }
- SRT_SOCKSTATUS srt::CUDT::getsockstate(SRTSOCKET u)
- {
- try
- {
- #if ENABLE_BONDING
- if (isgroup(u))
- {
- CUDTUnited::GroupKeeper k(uglobal(), u, CUDTUnited::ERH_THROW);
- return k.group->getStatus();
- }
- #endif
- return uglobal().getStatus(u);
- }
- catch (const CUDTException& e)
- {
- SetThreadLocalError(e);
- return SRTS_NONEXIST;
- }
- catch (const std::exception& ee)
- {
- LOGC(aclog.Fatal, log << "getsockstate: UNEXPECTED EXCEPTION: " << typeid(ee).name() << ": " << ee.what());
- SetThreadLocalError(CUDTException(MJ_UNKNOWN, MN_NONE, 0));
- return SRTS_NONEXIST;
- }
- }
- ////////////////////////////////////////////////////////////////////////////////
- namespace UDT
- {
- int startup()
- {
- return srt::CUDT::startup();
- }
- int cleanup()
- {
- return srt::CUDT::cleanup();
- }
- int bind(SRTSOCKET u, const struct sockaddr* name, int namelen)
- {
- return srt::CUDT::bind(u, name, namelen);
- }
- int bind2(SRTSOCKET u, UDPSOCKET udpsock)
- {
- return srt::CUDT::bind(u, udpsock);
- }
- int listen(SRTSOCKET u, int backlog)
- {
- return srt::CUDT::listen(u, backlog);
- }
- SRTSOCKET accept(SRTSOCKET u, struct sockaddr* addr, int* addrlen)
- {
- return srt::CUDT::accept(u, addr, addrlen);
- }
- int connect(SRTSOCKET u, const struct sockaddr* name, int namelen)
- {
- return srt::CUDT::connect(u, name, namelen, SRT_SEQNO_NONE);
- }
- int close(SRTSOCKET u)
- {
- return srt::CUDT::close(u);
- }
- int getpeername(SRTSOCKET u, struct sockaddr* name, int* namelen)
- {
- return srt::CUDT::getpeername(u, name, namelen);
- }
- int getsockname(SRTSOCKET u, struct sockaddr* name, int* namelen)
- {
- return srt::CUDT::getsockname(u, name, namelen);
- }
- int getsockopt(SRTSOCKET u, int level, SRT_SOCKOPT optname, void* optval, int* optlen)
- {
- return srt::CUDT::getsockopt(u, level, optname, optval, optlen);
- }
- int setsockopt(SRTSOCKET u, int level, SRT_SOCKOPT optname, const void* optval, int optlen)
- {
- return srt::CUDT::setsockopt(u, level, optname, optval, optlen);
- }
- // DEVELOPER API
- int connect_debug(SRTSOCKET u, const struct sockaddr* name, int namelen, int32_t forced_isn)
- {
- return srt::CUDT::connect(u, name, namelen, forced_isn);
- }
- int send(SRTSOCKET u, const char* buf, int len, int flags)
- {
- return srt::CUDT::send(u, buf, len, flags);
- }
- int recv(SRTSOCKET u, char* buf, int len, int flags)
- {
- return srt::CUDT::recv(u, buf, len, flags);
- }
- int sendmsg(SRTSOCKET u, const char* buf, int len, int ttl, bool inorder, int64_t srctime)
- {
- return srt::CUDT::sendmsg(u, buf, len, ttl, inorder, srctime);
- }
- int recvmsg(SRTSOCKET u, char* buf, int len, int64_t& srctime)
- {
- return srt::CUDT::recvmsg(u, buf, len, srctime);
- }
- int recvmsg(SRTSOCKET u, char* buf, int len)
- {
- int64_t srctime;
- return srt::CUDT::recvmsg(u, buf, len, srctime);
- }
- int64_t sendfile(SRTSOCKET u, fstream& ifs, int64_t& offset, int64_t size, int block)
- {
- return srt::CUDT::sendfile(u, ifs, offset, size, block);
- }
- int64_t recvfile(SRTSOCKET u, fstream& ofs, int64_t& offset, int64_t size, int block)
- {
- return srt::CUDT::recvfile(u, ofs, offset, size, block);
- }
- int64_t sendfile2(SRTSOCKET u, const char* path, int64_t* offset, int64_t size, int block)
- {
- fstream ifs(path, ios::binary | ios::in);
- int64_t ret = srt::CUDT::sendfile(u, ifs, *offset, size, block);
- ifs.close();
- return ret;
- }
- int64_t recvfile2(SRTSOCKET u, const char* path, int64_t* offset, int64_t size, int block)
- {
- fstream ofs(path, ios::binary | ios::out);
- int64_t ret = srt::CUDT::recvfile(u, ofs, *offset, size, block);
- ofs.close();
- return ret;
- }
- int select(int nfds, UDSET* readfds, UDSET* writefds, UDSET* exceptfds, const struct timeval* timeout)
- {
- return srt::CUDT::select(nfds, readfds, writefds, exceptfds, timeout);
- }
- int selectEx(const vector<SRTSOCKET>& fds,
- vector<SRTSOCKET>* readfds,
- vector<SRTSOCKET>* writefds,
- vector<SRTSOCKET>* exceptfds,
- int64_t msTimeOut)
- {
- return srt::CUDT::selectEx(fds, readfds, writefds, exceptfds, msTimeOut);
- }
- int epoll_create()
- {
- return srt::CUDT::epoll_create();
- }
- int epoll_clear_usocks(int eid)
- {
- return srt::CUDT::epoll_clear_usocks(eid);
- }
- int epoll_add_usock(int eid, SRTSOCKET u, const int* events)
- {
- return srt::CUDT::epoll_add_usock(eid, u, events);
- }
- int epoll_add_ssock(int eid, SYSSOCKET s, const int* events)
- {
- return srt::CUDT::epoll_add_ssock(eid, s, events);
- }
- int epoll_update_usock(int eid, SRTSOCKET u, const int* events)
- {
- return srt::CUDT::epoll_update_usock(eid, u, events);
- }
- int epoll_update_ssock(int eid, SYSSOCKET s, const int* events)
- {
- return srt::CUDT::epoll_update_ssock(eid, s, events);
- }
- int epoll_remove_usock(int eid, SRTSOCKET u)
- {
- return srt::CUDT::epoll_remove_usock(eid, u);
- }
- int epoll_remove_ssock(int eid, SYSSOCKET s)
- {
- return srt::CUDT::epoll_remove_ssock(eid, s);
- }
- int epoll_wait(int eid,
- set<SRTSOCKET>* readfds,
- set<SRTSOCKET>* writefds,
- int64_t msTimeOut,
- set<SYSSOCKET>* lrfds,
- set<SYSSOCKET>* lwfds)
- {
- return srt::CUDT::epoll_wait(eid, readfds, writefds, msTimeOut, lrfds, lwfds);
- }
- template <class SOCKTYPE>
- inline void set_result(set<SOCKTYPE>* val, int* num, SOCKTYPE* fds)
- {
- if (!val || !num || !fds)
- return;
- if (*num > int(val->size()))
- *num = int(val->size()); // will get 0 if val->empty()
- int count = 0;
- // This loop will run 0 times if val->empty()
- for (typename set<SOCKTYPE>::const_iterator it = val->begin(); it != val->end(); ++it)
- {
- if (count >= *num)
- break;
- fds[count++] = *it;
- }
- }
- int epoll_wait2(int eid,
- SRTSOCKET* readfds,
- int* rnum,
- SRTSOCKET* writefds,
- int* wnum,
- int64_t msTimeOut,
- SYSSOCKET* lrfds,
- int* lrnum,
- SYSSOCKET* lwfds,
- int* lwnum)
- {
- // This API is an alternative format for epoll_wait, created for
- // compatibility with other languages. Users need to pass in an array
- // for holding the returned sockets, with the maximum array length
- // stored in *rnum, etc., which will be updated with returned number
- // of sockets.
- set<SRTSOCKET> readset;
- set<SRTSOCKET> writeset;
- set<SYSSOCKET> lrset;
- set<SYSSOCKET> lwset;
- set<SRTSOCKET>* rval = NULL;
- set<SRTSOCKET>* wval = NULL;
- set<SYSSOCKET>* lrval = NULL;
- set<SYSSOCKET>* lwval = NULL;
- if ((readfds != NULL) && (rnum != NULL))
- rval = &readset;
- if ((writefds != NULL) && (wnum != NULL))
- wval = &writeset;
- if ((lrfds != NULL) && (lrnum != NULL))
- lrval = &lrset;
- if ((lwfds != NULL) && (lwnum != NULL))
- lwval = &lwset;
- int ret = srt::CUDT::epoll_wait(eid, rval, wval, msTimeOut, lrval, lwval);
- if (ret > 0)
- {
- // set<SRTSOCKET>::const_iterator i;
- // SET_RESULT(rval, rnum, readfds, i);
- set_result(rval, rnum, readfds);
- // SET_RESULT(wval, wnum, writefds, i);
- set_result(wval, wnum, writefds);
- // set<SYSSOCKET>::const_iterator j;
- // SET_RESULT(lrval, lrnum, lrfds, j);
- set_result(lrval, lrnum, lrfds);
- // SET_RESULT(lwval, lwnum, lwfds, j);
- set_result(lwval, lwnum, lwfds);
- }
- return ret;
- }
- int epoll_uwait(int eid, SRT_EPOLL_EVENT* fdsSet, int fdsSize, int64_t msTimeOut)
- {
- return srt::CUDT::epoll_uwait(eid, fdsSet, fdsSize, msTimeOut);
- }
- int epoll_release(int eid)
- {
- return srt::CUDT::epoll_release(eid);
- }
- ERRORINFO& getlasterror()
- {
- return srt::CUDT::getlasterror();
- }
- int getlasterror_code()
- {
- return srt::CUDT::getlasterror().getErrorCode();
- }
- const char* getlasterror_desc()
- {
- return srt::CUDT::getlasterror().getErrorMessage();
- }
- int getlasterror_errno()
- {
- return srt::CUDT::getlasterror().getErrno();
- }
- // Get error string of a given error code
- const char* geterror_desc(int code, int err)
- {
- srt::CUDTException e(CodeMajor(code / 1000), CodeMinor(code % 1000), err);
- return (e.getErrorMessage());
- }
- int bstats(SRTSOCKET u, SRT_TRACEBSTATS* perf, bool clear)
- {
- return srt::CUDT::bstats(u, perf, clear);
- }
- SRT_SOCKSTATUS getsockstate(SRTSOCKET u)
- {
- return srt::CUDT::getsockstate(u);
- }
- } // namespace UDT
- namespace srt
- {
- void setloglevel(LogLevel::type ll)
- {
- ScopedLock gg(srt_logger_config.mutex);
- srt_logger_config.max_level = ll;
- }
- void addlogfa(LogFA fa)
- {
- ScopedLock gg(srt_logger_config.mutex);
- srt_logger_config.enabled_fa.set(fa, true);
- }
- void dellogfa(LogFA fa)
- {
- ScopedLock gg(srt_logger_config.mutex);
- srt_logger_config.enabled_fa.set(fa, false);
- }
- void resetlogfa(set<LogFA> fas)
- {
- ScopedLock gg(srt_logger_config.mutex);
- for (int i = 0; i <= SRT_LOGFA_LASTNONE; ++i)
- srt_logger_config.enabled_fa.set(i, fas.count(i));
- }
- void resetlogfa(const int* fara, size_t fara_size)
- {
- ScopedLock gg(srt_logger_config.mutex);
- srt_logger_config.enabled_fa.reset();
- for (const int* i = fara; i != fara + fara_size; ++i)
- srt_logger_config.enabled_fa.set(*i, true);
- }
- void setlogstream(std::ostream& stream)
- {
- ScopedLock gg(srt_logger_config.mutex);
- srt_logger_config.log_stream = &stream;
- }
- void setloghandler(void* opaque, SRT_LOG_HANDLER_FN* handler)
- {
- ScopedLock gg(srt_logger_config.mutex);
- srt_logger_config.loghandler_opaque = opaque;
- srt_logger_config.loghandler_fn = handler;
- }
- void setlogflags(int flags)
- {
- ScopedLock gg(srt_logger_config.mutex);
- srt_logger_config.flags = flags;
- }
- SRT_API bool setstreamid(SRTSOCKET u, const std::string& sid)
- {
- return CUDT::setstreamid(u, sid);
- }
- SRT_API std::string getstreamid(SRTSOCKET u)
- {
- return CUDT::getstreamid(u);
- }
- int getrejectreason(SRTSOCKET u)
- {
- return CUDT::rejectReason(u);
- }
- int setrejectreason(SRTSOCKET u, int value)
- {
- return CUDT::rejectReason(u, value);
- }
- } // namespace srt
|