network.py 67 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606
  1. #
  2. # The Qubes OS Project, https://www.qubes-os.org/
  3. #
  4. # Copyright (C) 2015
  5. # Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
  6. # Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
  7. #
  8. # This library is free software; you can redistribute it and/or
  9. # modify it under the terms of the GNU Lesser General Public
  10. # License as published by the Free Software Foundation; either
  11. # version 2.1 of the License, or (at your option) any later version.
  12. #
  13. # This library is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  16. # Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public
  19. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  20. #
  21. from distutils import spawn
  22. import asyncio
  23. import subprocess
  24. import sys
  25. import time
  26. import unittest
  27. import qubes.tests
  28. import qubes.firewall
  29. import qubes.vm.qubesvm
  30. import qubes.vm.appvm
  31. # noinspection PyAttributeOutsideInit,PyPep8Naming
  32. class VmNetworkingMixin(object):
  33. test_ip = '192.168.123.45'
  34. test_name = 'test.example.com'
  35. ping_cmd = 'ping -W 1 -n -c 1 {target}'
  36. ping_ip = ping_cmd.format(target=test_ip)
  37. ping_name = ping_cmd.format(target=test_name)
  38. # filled by load_tests
  39. template = None
  40. def run_cmd(self, vm, cmd, user="root"):
  41. '''Run a command *cmd* in a *vm* as *user*. Return its exit code.
  42. :type self: qubes.tests.SystemTestCase | VmNetworkingMixin
  43. :param qubes.vm.qubesvm.QubesVM vm: VM object to run command in
  44. :param str cmd: command to execute
  45. :param std user: user to execute command as
  46. :return int: command exit code
  47. '''
  48. try:
  49. self.loop.run_until_complete(vm.run_for_stdio(cmd, user=user))
  50. except subprocess.CalledProcessError as e:
  51. return e.returncode
  52. return 0
  53. def setUp(self):
  54. '''
  55. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  56. '''
  57. super(VmNetworkingMixin, self).setUp()
  58. if self.template.startswith('whonix-'):
  59. self.skipTest("Test not supported here - Whonix uses its own "
  60. "firewall settings")
  61. if self.template.endswith('-minimal'):
  62. self.skipTest(
  63. "Test not supported here - minimal template don't have "
  64. "networking packages by default")
  65. self.init_default_template(self.template)
  66. self.testnetvm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  67. name=self.make_vm_name('netvm1'),
  68. label='red')
  69. self.loop.run_until_complete(self.testnetvm.create_on_disk())
  70. self.testnetvm.provides_network = True
  71. self.testnetvm.netvm = None
  72. self.testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  73. name=self.make_vm_name('vm1'),
  74. label='red')
  75. self.loop.run_until_complete(self.testvm1.create_on_disk())
  76. self.testvm1.netvm = self.testnetvm
  77. self.app.save()
  78. self.configure_netvm()
  79. def configure_netvm(self):
  80. '''
  81. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  82. '''
  83. def run_netvm_cmd(cmd):
  84. try:
  85. self.loop.run_until_complete(
  86. self.testnetvm.run_for_stdio(cmd, user='root'))
  87. except subprocess.CalledProcessError as e:
  88. self.fail("Command '%s' failed: %s%s" %
  89. (cmd, e.stdout.decode(), e.stderr.decode()))
  90. if not self.testnetvm.is_running():
  91. self.loop.run_until_complete(self.testnetvm.start())
  92. # Ensure that dnsmasq is installed:
  93. try:
  94. self.loop.run_until_complete(self.testnetvm.run_for_stdio(
  95. 'dnsmasq --version', user='root'))
  96. except subprocess.CalledProcessError:
  97. self.skipTest("dnsmasq not installed")
  98. run_netvm_cmd("ip link add test0 type dummy")
  99. run_netvm_cmd("ip link set test0 up")
  100. run_netvm_cmd("ip addr add {}/24 dev test0".format(self.test_ip))
  101. run_netvm_cmd("iptables -I INPUT -d {} -j ACCEPT --wait".format(
  102. self.test_ip))
  103. # ignore failure
  104. self.run_cmd(self.testnetvm, "pkill dnsmasq")
  105. run_netvm_cmd("dnsmasq -a {ip} -A /{name}/{ip} -i test0 -z".format(
  106. ip=self.test_ip, name=self.test_name))
  107. run_netvm_cmd("echo nameserver {} > /etc/resolv.conf".format(
  108. self.test_ip))
  109. run_netvm_cmd("/usr/lib/qubes/qubes-setup-dnat-to-ns")
  110. def test_000_simple_networking(self):
  111. '''
  112. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  113. '''
  114. self.loop.run_until_complete(self.testvm1.start())
  115. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  116. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  117. def test_010_simple_proxyvm(self):
  118. '''
  119. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  120. '''
  121. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  122. name=self.make_vm_name('proxy'),
  123. label='red')
  124. self.proxy.provides_network = True
  125. self.proxy.netvm = self.testnetvm
  126. self.loop.run_until_complete(self.proxy.create_on_disk())
  127. self.testvm1.netvm = self.proxy
  128. self.app.save()
  129. self.loop.run_until_complete(self.testvm1.start())
  130. self.assertTrue(self.proxy.is_running())
  131. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
  132. "Ping by IP from ProxyVM failed")
  133. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
  134. "Ping by name from ProxyVM failed")
  135. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  136. "Ping by IP from AppVM failed")
  137. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  138. "Ping by IP from AppVM failed")
  139. @qubes.tests.expectedFailureIfTemplate('debian-7')
  140. @unittest.skipUnless(spawn.find_executable('xdotool'),
  141. "xdotool not installed")
  142. def test_020_simple_proxyvm_nm(self):
  143. '''
  144. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  145. '''
  146. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  147. name=self.make_vm_name('proxy'),
  148. label='red')
  149. self.proxy.provides_network = True
  150. self.loop.run_until_complete(self.proxy.create_on_disk())
  151. self.proxy.netvm = self.testnetvm
  152. self.proxy.features['service.network-manager'] = True
  153. self.testvm1.netvm = self.proxy
  154. self.app.save()
  155. self.loop.run_until_complete(self.testvm1.start())
  156. self.assertTrue(self.proxy.is_running())
  157. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  158. "Ping by IP failed")
  159. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  160. "Ping by name failed")
  161. # reconnect to make sure that device was configured by NM
  162. self.assertEqual(
  163. self.run_cmd(self.proxy, "nmcli device disconnect eth0",
  164. user="user"),
  165. 0, "Failed to disconnect eth0 using nmcli")
  166. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  167. "Network should be disabled, but apparently it isn't")
  168. self.assertEqual(
  169. self.run_cmd(self.proxy,
  170. 'nmcli connection up "VM uplink eth0" ifname eth0',
  171. user="user"),
  172. 0, "Failed to connect eth0 using nmcli")
  173. self.assertEqual(self.run_cmd(self.proxy, "nm-online", user="user"), 0,
  174. "Failed to wait for NM connection")
  175. # check for nm-applet presence
  176. self.assertEqual(subprocess.call([
  177. 'xdotool', 'search', '--class', '{}:nm-applet'.format(
  178. self.proxy.name)],
  179. stdout=subprocess.DEVNULL), 0, "nm-applet window not found")
  180. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  181. "Ping by IP failed (after NM reconnection")
  182. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  183. "Ping by name failed (after NM reconnection)")
  184. def test_030_firewallvm_firewall(self):
  185. '''
  186. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  187. '''
  188. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  189. name=self.make_vm_name('proxy'),
  190. label='red')
  191. self.proxy.provides_network = True
  192. self.loop.run_until_complete(self.proxy.create_on_disk())
  193. self.proxy.netvm = self.testnetvm
  194. self.testvm1.netvm = self.proxy
  195. self.app.save()
  196. # block all for first
  197. self.testvm1.firewall.rules = [qubes.firewall.Rule(action='drop')]
  198. self.testvm1.firewall.save()
  199. self.loop.run_until_complete(self.testvm1.start())
  200. self.assertTrue(self.proxy.is_running())
  201. server = self.loop.run_until_complete(self.testnetvm.run(
  202. 'socat TCP-LISTEN:1234,fork EXEC:/bin/uname'))
  203. try:
  204. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
  205. "Ping by IP from ProxyVM failed")
  206. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
  207. "Ping by name from ProxyVM failed")
  208. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  209. "Ping by IP should be blocked")
  210. client_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
  211. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  212. "TCP connection should be blocked")
  213. # block all except ICMP
  214. self.testvm1.firewall.rules = [(
  215. qubes.firewall.Rule(None, action='accept', proto='icmp')
  216. )]
  217. self.testvm1.firewall.save()
  218. # Ugly hack b/c there is no feedback when the rules are actually
  219. # applied
  220. time.sleep(3)
  221. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  222. "Ping by IP failed (should be allowed now)")
  223. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  224. "Ping by name should be blocked")
  225. # all TCP still blocked
  226. self.testvm1.firewall.rules = [
  227. qubes.firewall.Rule(None, action='accept', proto='icmp'),
  228. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  229. ]
  230. self.testvm1.firewall.save()
  231. # Ugly hack b/c there is no feedback when the rules are actually
  232. # applied
  233. time.sleep(3)
  234. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  235. "Ping by name failed (should be allowed now)")
  236. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  237. "TCP connection should be blocked")
  238. # block all except target
  239. self.testvm1.firewall.rules = [
  240. qubes.firewall.Rule(None, action='accept', dsthost=self.test_ip,
  241. proto='tcp', dstports=1234),
  242. ]
  243. self.testvm1.firewall.save()
  244. # Ugly hack b/c there is no feedback when the rules are actually
  245. # applied
  246. time.sleep(3)
  247. self.assertEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  248. "TCP connection failed (should be allowed now)")
  249. # allow all except target
  250. self.testvm1.firewall.rules = [
  251. qubes.firewall.Rule(None, action='drop', dsthost=self.test_ip,
  252. proto='tcp', dstports=1234),
  253. qubes.firewall.Rule(action='accept'),
  254. ]
  255. self.testvm1.firewall.save()
  256. # Ugly hack b/c there is no feedback when the rules are actually
  257. # applied
  258. time.sleep(3)
  259. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  260. "TCP connection should be blocked")
  261. finally:
  262. server.terminate()
  263. self.loop.run_until_complete(server.wait())
  264. def test_040_inter_vm(self):
  265. '''
  266. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  267. '''
  268. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  269. name=self.make_vm_name('proxy'),
  270. label='red')
  271. self.loop.run_until_complete(self.proxy.create_on_disk())
  272. self.proxy.provides_network = True
  273. self.proxy.netvm = self.testnetvm
  274. self.testvm1.netvm = self.proxy
  275. self.testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  276. name=self.make_vm_name('vm2'),
  277. label='red')
  278. self.loop.run_until_complete(self.testvm2.create_on_disk())
  279. self.testvm2.netvm = self.proxy
  280. self.app.save()
  281. self.loop.run_until_complete(asyncio.wait([
  282. self.testvm1.start(),
  283. self.testvm2.start()]))
  284. self.assertNotEqual(self.run_cmd(self.testvm1,
  285. self.ping_cmd.format(target=self.testvm2.ip)), 0)
  286. self.testvm2.netvm = self.testnetvm
  287. self.assertNotEqual(self.run_cmd(self.testvm1,
  288. self.ping_cmd.format(target=self.testvm2.ip)), 0)
  289. self.assertNotEqual(self.run_cmd(self.testvm2,
  290. self.ping_cmd.format(target=self.testvm1.ip)), 0)
  291. self.testvm1.netvm = self.testnetvm
  292. self.assertNotEqual(self.run_cmd(self.testvm1,
  293. self.ping_cmd.format(target=self.testvm2.ip)), 0)
  294. self.assertNotEqual(self.run_cmd(self.testvm2,
  295. self.ping_cmd.format(target=self.testvm1.ip)), 0)
  296. def test_050_spoof_ip(self):
  297. '''Test if VM IP spoofing is blocked
  298. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  299. '''
  300. self.loop.run_until_complete(self.testvm1.start())
  301. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  302. self.assertEqual(self.run_cmd(self.testnetvm,
  303. 'iptables -I INPUT -i vif+ ! -s {} -p icmp -j LOG'.format(
  304. self.testvm1.ip)), 0)
  305. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  306. 'ip addr flush dev eth0 && '
  307. 'ip addr add 10.137.1.128/24 dev eth0 && '
  308. 'ip route add default dev eth0',
  309. user='root'))
  310. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  311. "Spoofed ping should be blocked")
  312. try:
  313. (output, _) = self.loop.run_until_complete(
  314. self.testnetvm.run_for_stdio('iptables -nxvL INPUT',
  315. user='root'))
  316. except subprocess.CalledProcessError:
  317. self.fail('iptables -nxvL INPUT failed')
  318. output = output.decode().splitlines()
  319. packets = output[2].lstrip().split()[0]
  320. self.assertEquals(packets, '0', 'Some packet hit the INPUT rule')
  321. def test_100_late_xldevd_startup(self):
  322. '''Regression test for #1990
  323. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  324. '''
  325. # Simulater late xl devd startup
  326. cmd = "systemctl stop xendriverdomain"
  327. if self.run_cmd(self.testnetvm, cmd) != 0:
  328. self.fail("Command '%s' failed" % cmd)
  329. self.loop.run_until_complete(self.testvm1.start())
  330. cmd = "systemctl start xendriverdomain"
  331. if self.run_cmd(self.testnetvm, cmd) != 0:
  332. self.fail("Command '%s' failed" % cmd)
  333. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  334. def test_110_dynamic_attach(self):
  335. self.testvm1.netvm = None
  336. self.loop.run_until_complete(self.testvm1.start())
  337. self.testvm1.netvm = self.testnetvm
  338. # wait for it to settle down
  339. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  340. 'udevadm settle'))
  341. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  342. def test_111_dynamic_detach_attach(self):
  343. self.loop.run_until_complete(self.testvm1.start())
  344. self.testvm1.netvm = None
  345. # wait for it to settle down
  346. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  347. 'udevadm settle'))
  348. self.testvm1.netvm = self.testnetvm
  349. # wait for it to settle down
  350. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  351. 'udevadm settle'))
  352. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  353. def test_112_reattach_after_provider_shutdown(self):
  354. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  355. name=self.make_vm_name('proxy'),
  356. label='red')
  357. self.proxy.provides_network = True
  358. self.proxy.netvm = self.testnetvm
  359. self.loop.run_until_complete(self.proxy.create_on_disk())
  360. self.testvm1.netvm = self.proxy
  361. self.loop.run_until_complete(self.testvm1.start())
  362. self.loop.run_until_complete(self.proxy.shutdown(force=True, wait=True))
  363. self.loop.run_until_complete(self.proxy.start())
  364. # wait for it to settle down
  365. self.loop.run_until_complete(self.wait_for_session(self.proxy))
  366. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  367. def test_113_reattach_after_provider_kill(self):
  368. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  369. name=self.make_vm_name('proxy'),
  370. label='red')
  371. self.proxy.provides_network = True
  372. self.proxy.netvm = self.testnetvm
  373. self.loop.run_until_complete(self.proxy.create_on_disk())
  374. self.testvm1.netvm = self.proxy
  375. self.loop.run_until_complete(self.testvm1.start())
  376. self.loop.run_until_complete(self.proxy.kill())
  377. self.loop.run_until_complete(self.proxy.start())
  378. # wait for it to settle down
  379. self.loop.run_until_complete(self.wait_for_session(self.proxy))
  380. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  381. def test_114_reattach_after_provider_crash(self):
  382. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  383. name=self.make_vm_name('proxy'),
  384. label='red')
  385. self.proxy.provides_network = True
  386. self.proxy.netvm = self.testnetvm
  387. self.loop.run_until_complete(self.proxy.create_on_disk())
  388. self.testvm1.netvm = self.proxy
  389. self.loop.run_until_complete(self.testvm1.start())
  390. p = self.loop.run_until_complete(self.proxy.run(
  391. 'echo c > /proc/sysrq-trigger', user='root'))
  392. self.loop.run_until_complete(p.wait())
  393. timeout = 10
  394. while self.proxy.is_running():
  395. self.loop.run_until_complete(asyncio.sleep(1))
  396. timeout -= 1
  397. self.assertGreater(timeout, 0,
  398. 'timeout waiting for crash cleanup')
  399. self.loop.run_until_complete(self.proxy.start())
  400. # wait for it to settle down
  401. self.loop.run_until_complete(self.wait_for_session(self.proxy))
  402. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  403. def test_200_fake_ip_simple(self):
  404. '''Test hiding VM real IP
  405. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  406. '''
  407. self.testvm1.features['net.fake-ip'] = '192.168.1.128'
  408. self.testvm1.features['net.fake-gateway'] = '192.168.1.1'
  409. self.testvm1.features['net.fake-netmask'] = '255.255.255.0'
  410. self.app.save()
  411. self.loop.run_until_complete(self.testvm1.start())
  412. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  413. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  414. try:
  415. (output, _) = self.loop.run_until_complete(
  416. self.testvm1.run_for_stdio(
  417. 'ip addr show dev eth0', user='root'))
  418. except subprocess.CalledProcessError:
  419. self.fail('ip addr show dev eth0 failed')
  420. output = output.decode()
  421. self.assertIn('192.168.1.128', output)
  422. self.assertNotIn(str(self.testvm1.ip), output)
  423. try:
  424. (output, _) = self.loop.run_until_complete(
  425. self.testvm1.run_for_stdio('ip route show', user='root'))
  426. except subprocess.CalledProcessError:
  427. self.fail('ip route show failed')
  428. output = output.decode()
  429. self.assertIn('192.168.1.1', output)
  430. self.assertNotIn(str(self.testvm1.netvm.ip), output)
  431. def test_201_fake_ip_without_gw(self):
  432. '''Test hiding VM real IP
  433. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  434. '''
  435. self.testvm1.features['net.fake-ip'] = '192.168.1.128'
  436. self.app.save()
  437. self.loop.run_until_complete(self.testvm1.start())
  438. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  439. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  440. try:
  441. (output, _) = self.loop.run_until_complete(
  442. self.testvm1.run_for_stdio('ip addr show dev eth0',
  443. user='root'))
  444. except subprocess.CalledProcessError:
  445. self.fail('ip addr show dev eth0 failed')
  446. output = output.decode()
  447. self.assertIn('192.168.1.128', output)
  448. self.assertNotIn(str(self.testvm1.ip), output)
  449. def test_202_fake_ip_firewall(self):
  450. '''Test hiding VM real IP, firewall
  451. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  452. '''
  453. self.testvm1.features['net.fake-ip'] = '192.168.1.128'
  454. self.testvm1.features['net.fake-gateway'] = '192.168.1.1'
  455. self.testvm1.features['net.fake-netmask'] = '255.255.255.0'
  456. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  457. name=self.make_vm_name('proxy'),
  458. label='red')
  459. self.proxy.provides_network = True
  460. self.loop.run_until_complete(self.proxy.create_on_disk())
  461. self.proxy.netvm = self.testnetvm
  462. self.testvm1.netvm = self.proxy
  463. self.app.save()
  464. # block all but ICMP and DNS
  465. self.testvm1.firewall.rules = [
  466. qubes.firewall.Rule(None, action='accept', proto='icmp'),
  467. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  468. ]
  469. self.testvm1.firewall.save()
  470. self.loop.run_until_complete(self.testvm1.start())
  471. self.assertTrue(self.proxy.is_running())
  472. server = self.loop.run_until_complete(self.testnetvm.run(
  473. 'socat TCP-LISTEN:1234,fork EXEC:/bin/uname'))
  474. try:
  475. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
  476. "Ping by IP from ProxyVM failed")
  477. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
  478. "Ping by name from ProxyVM failed")
  479. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  480. "Ping by IP should be allowed")
  481. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  482. "Ping by name should be allowed")
  483. client_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
  484. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  485. "TCP connection should be blocked")
  486. finally:
  487. server.terminate()
  488. self.loop.run_until_complete(server.wait())
  489. def test_203_fake_ip_inter_vm_allow(self):
  490. '''Access VM with "fake IP" from other VM (when firewall allows)
  491. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  492. '''
  493. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  494. name=self.make_vm_name('proxy'),
  495. label='red')
  496. self.loop.run_until_complete(self.proxy.create_on_disk())
  497. self.proxy.provides_network = True
  498. self.proxy.netvm = self.testnetvm
  499. self.testvm1.netvm = self.proxy
  500. self.testvm1.features['net.fake-ip'] = '192.168.1.128'
  501. self.testvm1.features['net.fake-gateway'] = '192.168.1.1'
  502. self.testvm1.features['net.fake-netmask'] = '255.255.255.0'
  503. self.testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  504. name=self.make_vm_name('vm2'),
  505. label='red')
  506. self.loop.run_until_complete(self.testvm2.create_on_disk())
  507. self.testvm2.netvm = self.proxy
  508. self.app.save()
  509. self.loop.run_until_complete(self.testvm1.start())
  510. self.loop.run_until_complete(self.testvm2.start())
  511. cmd = 'iptables -I FORWARD -s {} -d {} -j ACCEPT'.format(
  512. self.testvm2.ip, self.testvm1.ip)
  513. try:
  514. self.loop.run_until_complete(self.proxy.run_for_stdio(
  515. cmd, user='root'))
  516. except subprocess.CalledProcessError as e:
  517. raise AssertionError(
  518. '{} failed with: {}'.format(cmd, e.returncode)) from None
  519. try:
  520. cmd = 'iptables -I INPUT -s {} -j ACCEPT'.format(self.testvm2.ip)
  521. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  522. cmd, user='root'))
  523. except subprocess.CalledProcessError as e:
  524. raise AssertionError(
  525. '{} failed with: {}'.format(cmd, e.returncode)) from None
  526. self.assertEqual(self.run_cmd(self.testvm2,
  527. self.ping_cmd.format(target=self.testvm1.ip)), 0)
  528. try:
  529. cmd = 'iptables -nvxL INPUT | grep {}'.format(self.testvm2.ip)
  530. (stdout, _) = self.loop.run_until_complete(
  531. self.testvm1.run_for_stdio(cmd, user='root'))
  532. except subprocess.CalledProcessError as e:
  533. raise AssertionError(
  534. '{} failed with {}'.format(cmd, e.returncode)) from None
  535. self.assertNotEqual(stdout.decode().split()[0], '0',
  536. 'Packets didn\'t managed to the VM')
  537. def test_204_fake_ip_proxy(self):
  538. '''Test hiding VM real IP
  539. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  540. '''
  541. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  542. name=self.make_vm_name('proxy'),
  543. label='red')
  544. self.loop.run_until_complete(self.proxy.create_on_disk())
  545. self.proxy.provides_network = True
  546. self.proxy.netvm = self.testnetvm
  547. self.proxy.features['net.fake-ip'] = '192.168.1.128'
  548. self.proxy.features['net.fake-gateway'] = '192.168.1.1'
  549. self.proxy.features['net.fake-netmask'] = '255.255.255.0'
  550. self.testvm1.netvm = self.proxy
  551. self.app.save()
  552. self.loop.run_until_complete(self.testvm1.start())
  553. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0)
  554. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0)
  555. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  556. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  557. try:
  558. (output, _) = self.loop.run_until_complete(
  559. self.proxy.run_for_stdio(
  560. 'ip addr show dev eth0', user='root'))
  561. except subprocess.CalledProcessError:
  562. self.fail('ip addr show dev eth0 failed')
  563. output = output.decode()
  564. self.assertIn('192.168.1.128', output)
  565. self.assertNotIn(str(self.testvm1.ip), output)
  566. try:
  567. (output, _) = self.loop.run_until_complete(
  568. self.proxy.run_for_stdio(
  569. 'ip route show', user='root'))
  570. except subprocess.CalledProcessError:
  571. self.fail('ip route show failed')
  572. output = output.decode()
  573. self.assertIn('192.168.1.1', output)
  574. self.assertNotIn(str(self.testvm1.netvm.ip), output)
  575. try:
  576. (output, _) = self.loop.run_until_complete(
  577. self.testvm1.run_for_stdio(
  578. 'ip addr show dev eth0', user='root'))
  579. except subprocess.CalledProcessError:
  580. self.fail('ip addr show dev eth0 failed')
  581. output = output.decode()
  582. self.assertNotIn('192.168.1.128', output)
  583. self.assertIn(str(self.testvm1.ip), output)
  584. try:
  585. (output, _) = self.loop.run_until_complete(
  586. self.testvm1.run_for_stdio(
  587. 'ip route show', user='root'))
  588. except subprocess.CalledProcessError:
  589. self.fail('ip route show failed')
  590. output = output.decode()
  591. self.assertIn('192.168.1.128', output)
  592. self.assertNotIn(str(self.proxy.ip), output)
  593. def test_210_custom_ip_simple(self):
  594. '''Custom AppVM IP
  595. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  596. '''
  597. self.testvm1.ip = '192.168.1.1'
  598. self.app.save()
  599. self.loop.run_until_complete(self.testvm1.start())
  600. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  601. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  602. def test_211_custom_ip_proxy(self):
  603. '''Custom ProxyVM IP
  604. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  605. '''
  606. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  607. name=self.make_vm_name('proxy'),
  608. label='red')
  609. self.loop.run_until_complete(self.proxy.create_on_disk())
  610. self.proxy.provides_network = True
  611. self.proxy.netvm = self.testnetvm
  612. self.proxy.ip = '192.168.1.1'
  613. self.testvm1.netvm = self.proxy
  614. self.app.save()
  615. self.loop.run_until_complete(self.testvm1.start())
  616. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  617. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  618. def test_212_custom_ip_firewall(self):
  619. '''Custom VM IP and firewall
  620. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  621. '''
  622. self.testvm1.ip = '192.168.1.1'
  623. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  624. name=self.make_vm_name('proxy'),
  625. label='red')
  626. self.proxy.provides_network = True
  627. self.loop.run_until_complete(self.proxy.create_on_disk())
  628. self.proxy.netvm = self.testnetvm
  629. self.testvm1.netvm = self.proxy
  630. self.app.save()
  631. # block all but ICMP and DNS
  632. self.testvm1.firewall.rules = [
  633. qubes.firewall.Rule(None, action='accept', proto='icmp'),
  634. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  635. ]
  636. self.testvm1.firewall.save()
  637. self.loop.run_until_complete(self.testvm1.start())
  638. self.assertTrue(self.proxy.is_running())
  639. server = self.loop.run_until_complete(self.testnetvm.run(
  640. 'socat TCP-LISTEN:1234,fork EXEC:/bin/uname'))
  641. try:
  642. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
  643. "Ping by IP from ProxyVM failed")
  644. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
  645. "Ping by name from ProxyVM failed")
  646. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  647. "Ping by IP should be allowed")
  648. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  649. "Ping by name should be allowed")
  650. client_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
  651. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  652. "TCP connection should be blocked")
  653. finally:
  654. server.terminate()
  655. self.loop.run_until_complete(server.wait())
  656. # noinspection PyAttributeOutsideInit,PyPep8Naming
  657. class VmIPv6NetworkingMixin(VmNetworkingMixin):
  658. test_ip6 = '2000:abcd::1'
  659. ping6_cmd = 'ping6 -W 1 -n -c 1 {target}'
  660. def setUp(self):
  661. super(VmIPv6NetworkingMixin, self).setUp()
  662. self.ping6_ip = self.ping6_cmd.format(target=self.test_ip6)
  663. self.ping6_name = self.ping6_cmd.format(target=self.test_name)
  664. def configure_netvm(self):
  665. '''
  666. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  667. '''
  668. self.testnetvm.features['ipv6'] = True
  669. super(VmIPv6NetworkingMixin, self).configure_netvm()
  670. def run_netvm_cmd(cmd):
  671. try:
  672. self.loop.run_until_complete(
  673. self.testnetvm.run_for_stdio(cmd, user='root'))
  674. except subprocess.CalledProcessError as e:
  675. self.fail("Command '%s' failed: %s%s" %
  676. (cmd, e.stdout.decode(), e.stderr.decode()))
  677. run_netvm_cmd("ip addr add {}/128 dev test0".format(self.test_ip6))
  678. run_netvm_cmd(
  679. "ip6tables -I INPUT -d {} -j ACCEPT".format(self.test_ip6))
  680. # ignore failure
  681. self.run_cmd(self.testnetvm, "pkill dnsmasq")
  682. run_netvm_cmd(
  683. "dnsmasq -a {ip} -A /{name}/{ip} -A /{name}/{ip6} -i test0 -z".
  684. format(ip=self.test_ip, ip6=self.test_ip6, name=self.test_name))
  685. def test_500_ipv6_simple_networking(self):
  686. '''
  687. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  688. '''
  689. self.loop.run_until_complete(self.testvm1.start())
  690. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0)
  691. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0)
  692. def test_510_ipv6_simple_proxyvm(self):
  693. '''
  694. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  695. '''
  696. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  697. name=self.make_vm_name('proxy'),
  698. label='red')
  699. self.proxy.provides_network = True
  700. self.proxy.netvm = self.testnetvm
  701. self.loop.run_until_complete(self.proxy.create_on_disk())
  702. self.testvm1.netvm = self.proxy
  703. self.app.save()
  704. self.loop.run_until_complete(self.testvm1.start())
  705. self.assertTrue(self.proxy.is_running())
  706. self.assertEqual(self.run_cmd(self.proxy, self.ping6_ip), 0,
  707. "Ping by IP from ProxyVM failed")
  708. self.assertEqual(self.run_cmd(self.proxy, self.ping6_name), 0,
  709. "Ping by name from ProxyVM failed")
  710. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  711. "Ping by IP from AppVM failed")
  712. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
  713. "Ping by IP from AppVM failed")
  714. @qubes.tests.expectedFailureIfTemplate('debian-7')
  715. @unittest.skipUnless(spawn.find_executable('xdotool'),
  716. "xdotool not installed")
  717. def test_520_ipv6_simple_proxyvm_nm(self):
  718. '''
  719. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  720. '''
  721. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  722. name=self.make_vm_name('proxy'),
  723. label='red')
  724. self.proxy.provides_network = True
  725. self.loop.run_until_complete(self.proxy.create_on_disk())
  726. self.proxy.netvm = self.testnetvm
  727. self.proxy.features['service.network-manager'] = True
  728. self.testvm1.netvm = self.proxy
  729. self.app.save()
  730. self.loop.run_until_complete(self.testvm1.start())
  731. self.assertTrue(self.proxy.is_running())
  732. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  733. "Ping by IP failed")
  734. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
  735. "Ping by name failed")
  736. # reconnect to make sure that device was configured by NM
  737. self.assertEqual(
  738. self.run_cmd(self.proxy, "nmcli device disconnect eth0",
  739. user="user"),
  740. 0, "Failed to disconnect eth0 using nmcli")
  741. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  742. "Network should be disabled, but apparently it isn't")
  743. self.assertEqual(
  744. self.run_cmd(self.proxy,
  745. 'nmcli connection up "VM uplink eth0" ifname eth0',
  746. user="user"),
  747. 0, "Failed to connect eth0 using nmcli")
  748. self.assertEqual(self.run_cmd(self.proxy, "nm-online",
  749. user="user"), 0,
  750. "Failed to wait for NM connection")
  751. # wait for duplicate-address-detection to complete - by default it has
  752. # 1s timeout
  753. time.sleep(2)
  754. # check for nm-applet presence
  755. self.assertEqual(subprocess.call([
  756. 'xdotool', 'search', '--class', '{}:nm-applet'.format(
  757. self.proxy.name)],
  758. stdout=subprocess.DEVNULL), 0, "nm-applet window not found")
  759. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  760. "Ping by IP failed (after NM reconnection")
  761. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
  762. "Ping by name failed (after NM reconnection)")
  763. def test_530_ipv6_firewallvm_firewall(self):
  764. '''
  765. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  766. '''
  767. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  768. name=self.make_vm_name('proxy'),
  769. label='red')
  770. self.proxy.provides_network = True
  771. self.loop.run_until_complete(self.proxy.create_on_disk())
  772. self.proxy.netvm = self.testnetvm
  773. self.testvm1.netvm = self.proxy
  774. self.app.save()
  775. # block all for first
  776. self.testvm1.firewall.rules = [qubes.firewall.Rule(action='drop')]
  777. self.testvm1.firewall.save()
  778. self.loop.run_until_complete(self.testvm1.start())
  779. self.assertTrue(self.proxy.is_running())
  780. server = self.loop.run_until_complete(self.testnetvm.run(
  781. 'socat TCP6-LISTEN:1234,fork EXEC:/bin/uname'))
  782. try:
  783. self.assertEqual(self.run_cmd(self.proxy, self.ping6_ip), 0,
  784. "Ping by IP from ProxyVM failed")
  785. self.assertEqual(self.run_cmd(self.proxy, self.ping6_name), 0,
  786. "Ping by name from ProxyVM failed")
  787. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  788. "Ping by IP should be blocked")
  789. client6_cmd = "socat TCP:[{}]:1234 -".format(self.test_ip6)
  790. client4_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
  791. self.assertNotEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
  792. "TCP connection should be blocked")
  793. # block all except ICMP
  794. self.testvm1.firewall.rules = [(
  795. qubes.firewall.Rule(None, action='accept', proto='icmp')
  796. )]
  797. self.testvm1.firewall.save()
  798. # Ugly hack b/c there is no feedback when the rules are actually
  799. # applied
  800. time.sleep(3)
  801. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  802. "Ping by IP failed (should be allowed now)")
  803. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
  804. "Ping by name should be blocked")
  805. # all TCP still blocked
  806. self.testvm1.firewall.rules = [
  807. qubes.firewall.Rule(None, action='accept', proto='icmp'),
  808. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  809. ]
  810. self.testvm1.firewall.save()
  811. # Ugly hack b/c there is no feedback when the rules are actually
  812. # applied
  813. time.sleep(3)
  814. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
  815. "Ping by name failed (should be allowed now)")
  816. self.assertNotEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
  817. "TCP connection should be blocked")
  818. # block all except target
  819. self.testvm1.firewall.rules = [
  820. qubes.firewall.Rule(None, action='accept',
  821. dsthost=self.test_ip6,
  822. proto='tcp', dstports=1234),
  823. ]
  824. self.testvm1.firewall.save()
  825. # Ugly hack b/c there is no feedback when the rules are actually
  826. # applied
  827. time.sleep(3)
  828. self.assertEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
  829. "TCP connection failed (should be allowed now)")
  830. # block all except target - by name
  831. self.testvm1.firewall.rules = [
  832. qubes.firewall.Rule(None, action='accept',
  833. dsthost=self.test_name,
  834. proto='tcp', dstports=1234),
  835. ]
  836. self.testvm1.firewall.save()
  837. # Ugly hack b/c there is no feedback when the rules are actually
  838. # applied
  839. time.sleep(3)
  840. self.assertEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
  841. "TCP (IPv6) connection failed (should be allowed now)")
  842. self.assertEqual(self.run_cmd(self.testvm1, client4_cmd),
  843. 0,
  844. "TCP (IPv4) connection failed (should be allowed now)")
  845. # allow all except target
  846. self.testvm1.firewall.rules = [
  847. qubes.firewall.Rule(None, action='drop', dsthost=self.test_ip6,
  848. proto='tcp', dstports=1234),
  849. qubes.firewall.Rule(action='accept'),
  850. ]
  851. self.testvm1.firewall.save()
  852. # Ugly hack b/c there is no feedback when the rules are actually
  853. # applied
  854. time.sleep(3)
  855. self.assertNotEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
  856. "TCP connection should be blocked")
  857. finally:
  858. server.terminate()
  859. self.loop.run_until_complete(server.wait())
  860. def test_540_ipv6_inter_vm(self):
  861. '''
  862. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  863. '''
  864. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  865. name=self.make_vm_name('proxy'),
  866. label='red')
  867. self.loop.run_until_complete(self.proxy.create_on_disk())
  868. self.proxy.provides_network = True
  869. self.proxy.netvm = self.testnetvm
  870. self.testvm1.netvm = self.proxy
  871. self.testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  872. name=self.make_vm_name('vm2'),
  873. label='red')
  874. self.loop.run_until_complete(self.testvm2.create_on_disk())
  875. self.testvm2.netvm = self.proxy
  876. self.app.save()
  877. self.loop.run_until_complete(asyncio.wait([
  878. self.testvm1.start(),
  879. self.testvm2.start()]))
  880. self.assertNotEqual(self.run_cmd(self.testvm1,
  881. self.ping_cmd.format(target=self.testvm2.ip6)), 0)
  882. self.testvm2.netvm = self.testnetvm
  883. self.assertNotEqual(self.run_cmd(self.testvm1,
  884. self.ping_cmd.format(target=self.testvm2.ip6)), 0)
  885. self.assertNotEqual(self.run_cmd(self.testvm2,
  886. self.ping_cmd.format(target=self.testvm1.ip6)), 0)
  887. self.testvm1.netvm = self.testnetvm
  888. self.assertNotEqual(self.run_cmd(self.testvm1,
  889. self.ping_cmd.format(target=self.testvm2.ip6)), 0)
  890. self.assertNotEqual(self.run_cmd(self.testvm2,
  891. self.ping_cmd.format(target=self.testvm1.ip6)), 0)
  892. def test_550_ipv6_spoof_ip(self):
  893. '''Test if VM IP spoofing is blocked
  894. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  895. '''
  896. self.loop.run_until_complete(self.testvm1.start())
  897. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0)
  898. # add a simple rule counting packets
  899. self.assertEqual(self.run_cmd(self.testnetvm,
  900. 'ip6tables -I INPUT -i vif+ ! -s {} -p icmpv6 -j LOG'.format(
  901. self.testvm1.ip6)), 0)
  902. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  903. 'ip -6 addr flush dev eth0 && '
  904. 'ip -6 addr add {}/128 dev eth0 && '
  905. 'ip -6 route add default via {} dev eth0'.format(
  906. str(self.testvm1.visible_ip6) + '1',
  907. str(self.testvm1.visible_gateway6)),
  908. user='root'))
  909. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  910. "Spoofed ping should be blocked")
  911. try:
  912. (output, _) = self.loop.run_until_complete(
  913. self.testnetvm.run_for_stdio('ip6tables -nxvL INPUT',
  914. user='root'))
  915. except subprocess.CalledProcessError:
  916. self.fail('ip6tables -nxvL INPUT failed')
  917. output = output.decode().splitlines()
  918. packets = output[2].lstrip().split()[0]
  919. self.assertEquals(packets, '0', 'Some packet hit the INPUT rule')
  920. def test_710_ipv6_custom_ip_simple(self):
  921. '''Custom AppVM IP
  922. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  923. '''
  924. self.testvm1.ip6 = '2000:aaaa:bbbb::1'
  925. self.app.save()
  926. self.loop.run_until_complete(self.testvm1.start())
  927. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0)
  928. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0)
  929. def test_711_ipv6_custom_ip_proxy(self):
  930. '''Custom ProxyVM IP
  931. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  932. '''
  933. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  934. name=self.make_vm_name('proxy'),
  935. label='red')
  936. self.loop.run_until_complete(self.proxy.create_on_disk())
  937. self.proxy.provides_network = True
  938. self.proxy.netvm = self.testnetvm
  939. self.testvm1.ip6 = '2000:aaaa:bbbb::1'
  940. self.testvm1.netvm = self.proxy
  941. self.app.save()
  942. self.loop.run_until_complete(self.testvm1.start())
  943. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0)
  944. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0)
  945. def test_712_ipv6_custom_ip_firewall(self):
  946. '''Custom VM IP and firewall
  947. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  948. '''
  949. self.testvm1.ip6 = '2000:aaaa:bbbb::1'
  950. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  951. name=self.make_vm_name('proxy'),
  952. label='red')
  953. self.proxy.provides_network = True
  954. self.loop.run_until_complete(self.proxy.create_on_disk())
  955. self.proxy.netvm = self.testnetvm
  956. self.testvm1.netvm = self.proxy
  957. self.app.save()
  958. # block all but ICMP and DNS
  959. self.testvm1.firewall.rules = [
  960. qubes.firewall.Rule(None, action='accept', proto='icmp'),
  961. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  962. ]
  963. self.testvm1.firewall.save()
  964. self.loop.run_until_complete(self.testvm1.start())
  965. self.assertTrue(self.proxy.is_running())
  966. server = self.loop.run_until_complete(self.testnetvm.run(
  967. 'socat TCP6-LISTEN:1234,fork EXEC:/bin/uname'))
  968. try:
  969. self.assertEqual(self.run_cmd(self.proxy, self.ping6_ip), 0,
  970. "Ping by IP from ProxyVM failed")
  971. self.assertEqual(self.run_cmd(self.proxy, self.ping6_name), 0,
  972. "Ping by name from ProxyVM failed")
  973. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  974. "Ping by IP should be allowed")
  975. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
  976. "Ping by name should be allowed")
  977. client_cmd = "socat TCP:[{}]:1234 -".format(self.test_ip6)
  978. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  979. "TCP connection should be blocked")
  980. finally:
  981. server.terminate()
  982. self.loop.run_until_complete(server.wait())
  983. # noinspection PyAttributeOutsideInit,PyPep8Naming
  984. class VmUpdatesMixin(object):
  985. """
  986. Tests for VM updates
  987. """
  988. # filled by load_tests
  989. template = None
  990. # made this way to work also when no package build tools are installed
  991. """
  992. $ cat test-pkg.spec:
  993. Name: test-pkg
  994. Version: 1.0
  995. Release: 1%{?dist}
  996. Summary: Test package
  997. Group: System
  998. License: GPL
  999. URL: http://example.com/
  1000. %description
  1001. Test package
  1002. %files
  1003. %changelog
  1004. $ rpmbuild -bb test-pkg.spec
  1005. $ cat test-pkg-1.0-1.fc21.x86_64.rpm | gzip | base64
  1006. $ cat test-pkg-1.1-1.fc21.x86_64.rpm | gzip | base64
  1007. """
  1008. RPM_PACKAGE_GZIP_BASE64 = [(
  1009. b"H4sIAPzRLlYAA+2Y728URRjHn7ueUCkERKJVJDnTxLSxs7293o8WOER6ljYYrtKCLUSa3"
  1010. b"bnZ64bd22VmTq8nr4wJbwxvjNHIG0x8oTHGGCHB8AcYE1/0lS80GgmQFCJU3wgB4ZjdfZ"
  1011. b"q2xDe8NNlvMjfzmeeZH7tPbl98b35169cOUEpIJiTxT9SIrmVUs2hWh8dUAp54dOrM14s"
  1012. b"JHK4D2DKl+j2qrVfjsuq3qEWbohjuAB2Lqk+p1o/8Z5QPmSi/YwnjezH+F8bLQZjqllW0"
  1013. b"hvODRmFIL5hFk9JMXi/mi5ZuDleNwSEzP5wtmLnouNQnm3/6fndz7FLt9M/Hruj37gav4"
  1014. b"tTjPnasWLFixYoVK1asWLFixYoV63+p0KNot9vnIPQc1vgYOwCSgXfxCoS+QzKHOVXVOj"
  1015. b"Fn2ccIfI0k8nXkLuQbyJthxed4UrVnkG8i9yDfgsj3yCAv4foc8t+w1hf5B+Nl5Du43xj"
  1016. b"yvxivIN9HpsgPkO2IU9uQfeRn8Xk/iJ4x1Y3nfxH1qecwfhH5+YgT25F7o/0SRdxvOppP"
  1017. b"7MX9ZjB/DNnE/OOYX404uRGZIT+FbCFvQ3aQ8f0+/WF0XjJ8nyOw7H+BrmUA/a8pNZf2D"
  1018. b"XrCqLG1cERbWHI8ajhznpBY9P0Tr8PkvJDMhTkp/Z0DA6xpuL7DNOq5A+DY9UYTmkOF2U"
  1019. b"IO/sNt0wSnGvfdlZssD3rVIlLI9UUX37C6qXzHNntHPNfnTAhWHbUddtBwmegDjAUzZbu"
  1020. b"m9lqZmzDmHc8Ik8WY8Tab4Myym4+Gx8V0qw8GtYyWIzrktEJwV9UHv3ktG471rAqHTmFQ"
  1021. b"685V5uGqIalk06SWJr7tszR503Ac9cs493jJ8rhrSCIYbXBbzqt5v5+UZ0crh6bGR2dmJ"
  1022. b"yuHD428VlLLLdakzJe2VxcKhFSFID73JKPS40RI7tXVCcQ3uOGWhPCJ2bAspiJ2i5Vy6n"
  1023. b"jOqMerpEYpEe/Yks4xkU4Tt6BirmzUWanG6ozbFKhve9BsQRaLRTirzqk7hgUktXojKnf"
  1024. b"n8jeg3X4QepP3i63po6oml+9t/CwJLya2Bn/ei6f7/4B3Ycdb0L3pt5Q5mNz16rWJ9fLk"
  1025. b"vvOff/nxS7//8O2P2gvt7nDDnoV9L1du9N4+ucjl9u/8+a7dC5Nnvjlv9Ox5r+v9Cy0NE"
  1026. b"m+c6rv60S/dZw98Gn6MNswcfQiWUvg3wBUAAA=="
  1027. ), (
  1028. b"H4sIAMY1B1wCA+2Y72scRRjH537U1MZorKLRVjgJSIKdvf11u3dq0jZJ0wRLL+1VvBRrn"
  1029. b"J2dvVu6t7vd3bN3sS9ECr6RIkgR9JXQF5aiIk1L/gJF8EVe+KqiKLQQi03tmypojXO3zz"
  1030. b"Vp8YW+3y/MPvOZ55lnZthhXjw3Lqx9n0FcqYiFEfaP17AkSLxZVJbQ/1QKbbl/6Mxnqyn"
  1031. b"o9iE0uMTtOPTPcTvIJw1w+8DdDCj1KPBozJlVbrO8OcC/xvORH8/P3AT/2+D/DfynuTtH"
  1032. b"FKtANKNo6KpKJIs3jSkl3VIkvSAWSiZTlCL3akhXZCKKKmPcaRRJqURFS2MlSTMsgyqMz"
  1033. b"6RUp8SQFcmixZJpMlEWi0w1da3Eu3p3+1uW1saPHfpWOSvNXtruDVx4+A0+eAolSpQoUa"
  1034. b"JEiRIlSpQoUaJEiRJBTWR9ff191K1p3FM3ySGUEbndjbp1jUwOYkzetkJMr07SqZukgX8"
  1035. b"B7ge+DvwI2qijPMjbE8A3gIeB11BcVxGBb8J8FfgW+PcA3wb/FPAfkG8G+C/wl4HvAFPg"
  1036. b"v4HtmLOPA/vAT8J534vPmB2C9T+NbfYp8C8DPx1zagfwSJwvpUO+ajye2gP55iF+BtiA+"
  1037. b"Nch3ow5/TkwA74IbAFfBnaAl2N+7IN4vfQK8Ffg/w74arx++grwtTg+s7PDk6hXn0OSIC"
  1038. b"Gozx3hYzmf0OOkxu6F1/oKlx2PEqfuhRFckv1zB1ClHUasgepR5L+Qz7MWafgOE6jXyCP"
  1039. b"Hdpst1CpqC5qK/qUaKIQBFQK/sbGTXmeET8KaCgW7bZsbj3dsY2TSa/gBC0NmTtsOO0ga"
  1040. b"LBxF4OuMTNk1nmtjbI60HY90g8MZ8iabC5hlt+53z4bVxVGkCKKgYgmpgiaIXdv5FgS52"
  1041. b"5dUQY6P37kbWzcVNzd1cVnO4VoO+7bPcvhV4jj8y4LAC8YsL2iQCIeMNgM7avNxfxeeWp"
  1042. b"guHz4yOz2/UCm/cnhy35jcG99/YHZislpd2Fup7OMR5YOVHLZYizI/sj035BBG/BdhP/A"
  1043. b"iRiMvwGEUeC5fuxYw6gUmrlGKw5N2ROuMh4c+o+FYvhkGeX7wPD9/PmBmnURgcJ0EJnOZ"
  1044. b"iSmV/kM4cV3PsN04uqGp/BM1XTZW4zkCm/L9kbDt0jrfk9cMcdM9absmjojhsI3NU4eE9"
  1045. b"d4R+LG4g1qbGFHf9lBrEclwnTCs3r1iuOY2u/+jGVm4iCwiyXpJE61SkUq6RhVW0FVFpo"
  1046. b"ZZ0oiu6ppuFSxSFBXTUOQCFRmhhElFQ9XNgiyJhbv/dnf8hnaeETR4R1+sHuX37+c/H/o"
  1047. b"kjZ5Nbe88bMvv7voJvYWeOYaGBn7IGkr6xb3X5vqiExNL585/+NyPX3/5jbBzfaibcHhl"
  1048. b"4vny9ZHfT6wG0Y6Lfrv/pZXKmS+WyPD4O/2nLy0KKHXo1OjVs1eGPn75o+5DvW3+6D9jd"
  1049. b"bFaTBcAAA=="
  1050. )]
  1051. """
  1052. Minimal package generated by running dh_make on empty directory
  1053. Then cat test-pkg_1.0-1_amd64.deb | gzip | base64
  1054. Then cat test-pkg_1.1-1_amd64.deb | gzip | base64
  1055. """
  1056. DEB_PACKAGE_GZIP_BASE64 = [(
  1057. b"H4sIACTXLlYAA1O0SSxKzrDjSklNykzM003KzEssqlRQUDA0MTG1NDQwNDVTUDBQAAEIa"
  1058. b"WhgYGZioqBgogADCVxGegZcyfl5JUX5OXoliUV66VVE6DcwheuX7+ZgAAEW5rdXHb0PG4"
  1059. b"iwf5j3WfMT6zWzzMuZgoE3jjYraNzbbFKWGms0SaRw/r2SV23WZ4IdP8preM4yqf0jt95"
  1060. b"3c8qnacfNxJUkf9/w+/3X9ph2GEdgQdixrz/niHKKTnYXizf4oSC7tHOz2Zzq+/6vn8/7"
  1061. b"ezQ7c1tmi7xZ3SGJ4yzhT2dcr7V+W3zM5ZPu/56PSv4Zdok+7Yv/V/6buWaKVlFkkV58S"
  1062. b"N3GmLgnqzRmeZ3V3ymmurS5fGa85/LNx1bpZMin3S6dvXKqydp3ubP1vmyarJZb/qSh62"
  1063. b"C8oIdxqm/BtvkGDza+On/Vfv2py7/0LV7VH+qR6a+bkKUbHXt5/SG187d+nps1a5PJfMO"
  1064. b"i11dWcUe1HjwaW3Q5RHXn9LmcHy+tW9YcKf0768XVB1t3R0bKrzs5t9P+6r7rZ99svH10"
  1065. b"+Q6F/o8tf1fO/32y+fWa14eifd+WxUy0jcxYH7N9/tUvmnUZL74pW32qLeuRU+ZwYGASa"
  1066. b"GBgUWBgxM90ayy3VdmykkGDgYErJbEkERydFVWQmCMQo8aWZvAY/WteFRHFwMCYqXTPjI"
  1067. b"lBkVEMGLsl+k8XP1D/z+gXyyDOvUemlnHqAVkvu0rRQ2fUFodkN3mtU9uwhqk8V+TqPEE"
  1068. b"Nc7fzoQ4n71lqRs/7kbbT0+qOZuKH4r8mjzsc1k/YkCHN8Pjg48fbpE+teHa96LNcfu0V"
  1069. b"5n2/Z2xa2KDvaCOx8cqBFxc514uZ3TmadXS+6cpzU7wSzq5SWfapJOD9n6wLXSwtlgxZh"
  1070. b"xITzWW7buhx/bb291RcVlEfeC9K5hlrqunSzIMSZT7/Nqgc/qMvMNW227WI8ezB8mVuZh"
  1071. b"0hERJSvysfburr4Dx0I9BW57UwR4+e1gxu49PcEt8sbK18Xpvt//Hj5UYm+Zc25q+T4xl"
  1072. b"rJvxfVnh80oadq57OZxPaU1bbztv1yF365W4t45Yr+XrFzov237GVY1Zgf7NvE4+W2SuR"
  1073. b"lQtLauR1TQ/mbOiIONYya6tU1jPGpWfk/i1+ttiXe3ZO14n0YOWggndznjGlGLyfVbBC6"
  1074. b"MRP5aMM7aCco/s7sZqB8RlTQwADw8rnuT/sDHi7mUASjJFRAAbWwNLiAwAA"
  1075. ), (
  1076. b"H4sIAL05B1wCA1O0SSxKzrDjSklNykzM003KzEssqlRQUDA0NTG2NDc3NjdTUDBQAAEIa"
  1077. b"WhgYGZioqBgogADCVxGegZcyfl5JUX5OXoliUV66VVE6De3gOuX7+ZgAAEW5rdXzmbdMR"
  1078. b"BgSJj/VeQzQ+ztT/W+EVEnFraKOTlXh6+JXB8RbTRpzgWb2qdLX0+RmTRZcYlyxJutJsk"
  1079. b"/pfsfq9yqWZJ4JVVS97jBPPnz1yviluw51b0q4tnrWemCU2a/17mTUBYX0XBC6nH8rvvZ"
  1080. b"n/WP7nu40+Jlz7drPNLvCjULQkXOv677OV9s4bPsv5+tvCzPG8s57no479qV/5V/813Kh"
  1081. b"Wy3Pbj4827Jq5v6W/wk7zL1/+zbfH6btVb/3Pm5EapukaJvdgfcape/JZZWe+mZ4+Grby"
  1082. b"7UTaroPzyv9urC1W2MT9+F2bZtWJOyXfGo5dv7DGXJUzee+p930Od0j8QNceNHJffOTr2"
  1083. b"kOJe93mWG+nPdLsG6fz++MV5h1OGr0N9yf3N2ydzQ5x/E9Aw/s9xzmOpULnKtsSZqc/rr"
  1084. b"RQdf/Lu/ckKE9xU5VRuNehbzTr6789a+P2lt2zk5cFqe3N2289+j/hfH2X39/+nvc5vTW"
  1085. b"a/+83pvWqY3e93JWYsmup693HzCOPBk0LI9O7PtiqawN9y8eaTV75DLLL2dNWqTLsTsOn"
  1086. b"7wy0fTe5oLH//7eNf89Co3dRUHJmLRh20s/xhYJkoeYdBgYEhJLEkEJ4uKKkgKIJQyjI3"
  1087. b"gKeOveVVEFAMDY6bSPTMmBkVGMWAqKdF/uviB+n/GwlgGce49MrWMUw/IetlVih46o7Y4"
  1088. b"0uZe/t9lt85aMUrdWhjueTHRd1nr1uK830feH74vcPKU2pkbP4SZnta5PhC9dfPTqvv7f"
  1089. b"n068XRDRDzLuv8Oa5p1L+02ZN127vp6mzSzzFqpLkmbwyl131J1xW58YlcxXSWs0PTbpT"
  1090. b"z28ZUnE/e+NN93weAd40a/zzJ7+Re/v+R7+f3VBVFJCyZsv523ySJ12t7Nt5b8uBu8zuJ"
  1091. b"2Laer//nZCkbXlxtYXvvA8+VSVsCRpo8BawtftKWyZBjkWa6/0X7qXfbF9reH/ro6S63Y"
  1092. b"rCj8t8cltPIOj9H/8LyIxj6bMsZVVtu+ngj6MCNV5JXhOs07RXWxrb3xsqJMDRksx/5bO"
  1093. b"bNtevXz2cdpzzI19Roede4NXxAyK9Dlrtp8JtELLNPWbBe9HfJlj1Hiv69erIFBnX/Pe1"
  1094. b"4QnzLD+p2AiTc383/P+7sW3WoxnXra49iJKJeZy7gc9Z02S57qrvWW3day501VhsbPtfK"
  1095. b"C5nyBG9qjr08E59KY1vUTGRg7mRsCGBimFa+3sTPg7WYCSTBGRgEAzEOeH04EAAA="
  1096. )]
  1097. def run_cmd(self, vm, cmd, user="root"):
  1098. '''Run a command *cmd* in a *vm* as *user*. Return its exit code.
  1099. :type self: qubes.tests.SystemTestCase | VmUpdatesMixin
  1100. :param qubes.vm.qubesvm.QubesVM vm: VM object to run command in
  1101. :param str cmd: command to execute
  1102. :param std user: user to execute command as
  1103. :return int: command exit code
  1104. '''
  1105. try:
  1106. self.loop.run_until_complete(vm.run_for_stdio(cmd))
  1107. except subprocess.CalledProcessError as e:
  1108. return e.returncode
  1109. return 0
  1110. def assertRunCommandReturnCode(self, vm, cmd, expected_returncode):
  1111. p = self.loop.run_until_complete(
  1112. vm.run(cmd, user='root',
  1113. stdout=subprocess.PIPE, stderr=subprocess.PIPE))
  1114. (stdout, stderr) = self.loop.run_until_complete(p.communicate())
  1115. self.assertIn(
  1116. self.loop.run_until_complete(p.wait()), expected_returncode,
  1117. '{}: {}\n{}'.format(cmd, stdout, stderr))
  1118. def setUp(self):
  1119. '''
  1120. :type self: qubes.tests.SystemTestCase | VmUpdatesMixin
  1121. '''
  1122. if not self.template.count('debian') and \
  1123. not self.template.count('fedora'):
  1124. self.skipTest("Template {} not supported by this test".format(
  1125. self.template))
  1126. super(VmUpdatesMixin, self).setUp()
  1127. self.update_cmd = None
  1128. if self.template.count("debian"):
  1129. self.update_cmd = "set -o pipefail; apt-get update 2>&1 | " \
  1130. "{ ! grep '^W:\|^E:'; }"
  1131. self.upgrade_cmd = "apt-get -V dist-upgrade -y"
  1132. self.install_cmd = "apt-get install -y {}"
  1133. self.install_test_cmd = "dpkg -l {}"
  1134. self.exit_code_ok = [0]
  1135. elif self.template.count("fedora"):
  1136. cmd = "yum"
  1137. try:
  1138. # assume template name in form "fedora-XX-suffix"
  1139. if int(self.template.split("-")[1]) > 21:
  1140. cmd = "dnf"
  1141. except ValueError:
  1142. pass
  1143. self.update_cmd = "{cmd} clean all; {cmd} check-update".format(
  1144. cmd=cmd)
  1145. self.upgrade_cmd = "{cmd} upgrade -y".format(cmd=cmd)
  1146. self.install_cmd = cmd + " install -y {}"
  1147. self.install_test_cmd = "rpm -q {}"
  1148. self.exit_code_ok = [0, 100]
  1149. self.init_default_template(self.template)
  1150. self.init_networking()
  1151. self.testvm1 = self.app.add_new_vm(
  1152. qubes.vm.appvm.AppVM,
  1153. name=self.make_vm_name('vm1'),
  1154. label='red')
  1155. self.loop.run_until_complete(self.testvm1.create_on_disk())
  1156. def test_000_simple_update(self):
  1157. '''
  1158. :type self: qubes.tests.SystemTestCase | VmUpdatesMixin
  1159. '''
  1160. self.app.save()
  1161. self.testvm1 = self.app.domains[self.testvm1.qid]
  1162. self.loop.run_until_complete(self.testvm1.start())
  1163. self.assertRunCommandReturnCode(self.testvm1,
  1164. self.update_cmd, self.exit_code_ok)
  1165. def create_repo_apt(self, version=0):
  1166. '''
  1167. :type self: qubes.tests.SystemTestCase | VmUpdatesMixin
  1168. '''
  1169. pkg_file_name = "test-pkg_1.{}-1_amd64.deb".format(version)
  1170. self.loop.run_until_complete(self.netvm_repo.run_for_stdio('''
  1171. mkdir -p /tmp/apt-repo \
  1172. && cd /tmp/apt-repo \
  1173. && base64 -d | zcat > {}
  1174. '''.format(pkg_file_name),
  1175. input=self.DEB_PACKAGE_GZIP_BASE64[version]))
  1176. # do not assume dpkg-scanpackage installed
  1177. packages_path = "dists/test/main/binary-amd64/Packages"
  1178. self.loop.run_until_complete(self.netvm_repo.run_for_stdio('''
  1179. mkdir -p /tmp/apt-repo/dists/test/main/binary-amd64 \
  1180. && cd /tmp/apt-repo \
  1181. && cat > {packages} \
  1182. && echo MD5sum: $(openssl md5 -r {pkg} | cut -f 1 -d ' ') \
  1183. >> {packages} \
  1184. && echo SHA1: $(openssl sha1 -r {pkg} | cut -f 1 -d ' ') \
  1185. >> {packages} \
  1186. && echo SHA256: $(openssl sha256 -r {pkg} | cut -f 1 -d ' ') \
  1187. >> {packages} \
  1188. && sed -i -e "s,@SIZE@,$(stat -c %s {pkg})," {packages} \
  1189. && gzip < {packages} > {packages}.gz
  1190. '''.format(pkg=pkg_file_name, packages=packages_path),
  1191. input='''\
  1192. Package: test-pkg
  1193. Version: 1.{version}-1
  1194. Architecture: amd64
  1195. Maintainer: unknown <user@host>
  1196. Installed-Size: 25
  1197. Filename: {pkg}
  1198. Size: @SIZE@
  1199. Section: unknown
  1200. Priority: optional
  1201. Description: Test package'''.format(pkg=pkg_file_name, version=version).encode(
  1202. 'utf-8')))
  1203. self.loop.run_until_complete(self.netvm_repo.run_for_stdio('''
  1204. mkdir -p /tmp/apt-repo/dists/test \
  1205. && cd /tmp/apt-repo/dists/test \
  1206. && cat > Release \
  1207. && echo '' $(sha256sum {p} | cut -f 1 -d ' ') $(stat -c %s {p}) {p}\
  1208. >> Release \
  1209. && echo '' $(sha256sum {z} | cut -f 1 -d ' ') $(stat -c %s {z}) {z}\
  1210. >> Release
  1211. '''.format(p='main/binary-amd64/Packages',
  1212. z='main/binary-amd64/Packages.gz'),
  1213. input=b'''\
  1214. Label: Test repo
  1215. Suite: test
  1216. Codename: test
  1217. Date: Tue, 27 Oct 2015 03:22:09 UTC
  1218. Architectures: amd64
  1219. Components: main
  1220. SHA256:
  1221. '''))
  1222. def create_repo_yum(self, version=0):
  1223. '''
  1224. :type self: qubes.tests.SystemTestCase | VmUpdatesMixin
  1225. '''
  1226. pkg_file_name = "test-pkg-1.{}-1.fc21.x86_64.rpm".format(version)
  1227. self.loop.run_until_complete(self.netvm_repo.run_for_stdio('''
  1228. mkdir -p /tmp/yum-repo \
  1229. && cd /tmp/yum-repo \
  1230. && base64 -d | zcat > {}
  1231. '''.format(pkg_file_name), input=self.RPM_PACKAGE_GZIP_BASE64[
  1232. version]))
  1233. # createrepo is installed by default in Fedora template
  1234. self.loop.run_until_complete(self.netvm_repo.run_for_stdio(
  1235. 'createrepo /tmp/yum-repo'))
  1236. def create_repo_and_serve(self):
  1237. '''
  1238. :type self: qubes.tests.SystemTestCase | VmUpdatesMixin
  1239. '''
  1240. if self.template.count("debian") or self.template.count("whonix"):
  1241. self.create_repo_apt()
  1242. self.loop.run_until_complete(self.netvm_repo.run(
  1243. 'cd /tmp/apt-repo && python -m SimpleHTTPServer 8080',
  1244. stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL))
  1245. elif self.template.count("fedora"):
  1246. self.create_repo_yum()
  1247. self.loop.run_until_complete(self.netvm_repo.run(
  1248. 'cd /tmp/yum-repo && python -m SimpleHTTPServer 8080',
  1249. stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL))
  1250. else:
  1251. # not reachable...
  1252. self.skipTest("Template {} not supported by this test".format(
  1253. self.template))
  1254. def add_update_to_repo(self):
  1255. if self.template.count("debian") or self.template.count("whonix"):
  1256. self.create_repo_apt(1)
  1257. elif self.template.count("fedora"):
  1258. self.create_repo_yum(1)
  1259. def configure_test_repo(self):
  1260. """
  1261. Configure test repository in test-vm and disable rest of them.
  1262. The critical part is to use "localhost" - this will work only when
  1263. accessed through update proxy and this is exactly what we want to
  1264. test here.
  1265. :type self: qubes.tests.SystemTestCase | VmUpdatesMixin
  1266. """
  1267. if self.template.count("debian") or self.template.count("whonix"):
  1268. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  1269. "rm -f /etc/apt/sources.list.d/* &&"
  1270. "echo 'deb [trusted=yes] http://localhost:8080 test main' "
  1271. "> /etc/apt/sources.list",
  1272. user="root"))
  1273. elif self.template.count("fedora"):
  1274. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  1275. "rm -f /etc/yum.repos.d/*.repo &&"
  1276. "echo '[test]' > /etc/yum.repos.d/test.repo &&"
  1277. "echo 'name=Test repo' >> /etc/yum.repos.d/test.repo &&"
  1278. "echo 'gpgcheck=0' >> /etc/yum.repos.d/test.repo &&"
  1279. "echo 'baseurl=http://localhost:8080/'"
  1280. " >> /etc/yum.repos.d/test.repo",
  1281. user="root"
  1282. ))
  1283. else:
  1284. # not reachable...
  1285. self.skipTest("Template {} not supported by this test".format(
  1286. self.template))
  1287. def test_010_update_via_proxy(self):
  1288. '''
  1289. Test both whether updates proxy works and whether is actually used
  1290. by the VM
  1291. :type self: qubes.tests.SystemTestCase | VmUpdatesMixin
  1292. '''
  1293. if self.template.count("minimal"):
  1294. self.skipTest("Template {} not supported by this test".format(
  1295. self.template))
  1296. self.netvm_repo = self.app.add_new_vm(
  1297. qubes.vm.appvm.AppVM,
  1298. name=self.make_vm_name('net'),
  1299. label='red')
  1300. self.netvm_repo.provides_network = True
  1301. self.loop.run_until_complete(self.netvm_repo.create_on_disk())
  1302. self.testvm1.netvm = self.netvm_repo
  1303. self.netvm_repo.features['service.qubes-updates-proxy'] = True
  1304. # TODO: consider also adding a test for the template itself
  1305. self.testvm1.features['service.updates-proxy-setup'] = True
  1306. self.app.save()
  1307. # Setup test repo
  1308. self.loop.run_until_complete(self.netvm_repo.start())
  1309. self.create_repo_and_serve()
  1310. # Configure local repo
  1311. self.loop.run_until_complete(self.testvm1.start())
  1312. self.configure_test_repo()
  1313. with self.qrexec_policy('qubes.UpdatesProxy', self.testvm1,
  1314. '$default', action='allow,target=' + self.netvm_repo.name):
  1315. # update repository metadata
  1316. self.assertRunCommandReturnCode(self.testvm1,
  1317. self.update_cmd, self.exit_code_ok)
  1318. # install test package
  1319. self.assertRunCommandReturnCode(self.testvm1,
  1320. self.install_cmd.format('test-pkg'), self.exit_code_ok)
  1321. # verify if it was really installed
  1322. self.assertRunCommandReturnCode(self.testvm1,
  1323. self.install_test_cmd.format('test-pkg'), self.exit_code_ok)
  1324. def test_020_updates_available_notification(self):
  1325. # override with StandaloneVM
  1326. self.testvm1 = self.app.add_new_vm(
  1327. qubes.vm.standalonevm.StandaloneVM,
  1328. name=self.make_vm_name('vm2'),
  1329. label='red')
  1330. tpl = self.app.domains[self.template]
  1331. self.testvm1.clone_properties(tpl)
  1332. self.testvm1.features.update(tpl.features)
  1333. self.loop.run_until_complete(
  1334. self.testvm1.clone_disk_files(tpl))
  1335. self.loop.run_until_complete(self.testvm1.start())
  1336. self.netvm_repo = self.testvm1
  1337. self.create_repo_and_serve()
  1338. self.configure_test_repo()
  1339. self.loop.run_until_complete(
  1340. self.testvm1.run_for_stdio(
  1341. '/usr/lib/qubes/upgrades-status-notify',
  1342. user='root',
  1343. ))
  1344. self.assertFalse(self.testvm1.features.get('updates-available', False))
  1345. # update repository metadata
  1346. self.assertRunCommandReturnCode(
  1347. self.testvm1, self.update_cmd, self.exit_code_ok)
  1348. # install test package
  1349. self.assertRunCommandReturnCode(
  1350. self.testvm1, self.install_cmd.format('test-pkg'), self.exit_code_ok)
  1351. self.assertFalse(self.testvm1.features.get('updates-available', False))
  1352. self.add_update_to_repo()
  1353. # update repository metadata
  1354. self.assertRunCommandReturnCode(
  1355. self.testvm1, self.update_cmd, self.exit_code_ok)
  1356. self.loop.run_until_complete(
  1357. self.testvm1.run_for_stdio(
  1358. '/usr/lib/qubes/upgrades-status-notify',
  1359. user='root',
  1360. ))
  1361. self.assertTrue(self.testvm1.features.get('updates-available', False))
  1362. # install updates
  1363. self.assertRunCommandReturnCode(
  1364. self.testvm1, self.upgrade_cmd, self.exit_code_ok)
  1365. self.assertFalse(self.testvm1.features.get('updates-available', False))
  1366. def create_testcases_for_templates():
  1367. yield from qubes.tests.create_testcases_for_templates('VmNetworking',
  1368. VmNetworkingMixin, qubes.tests.SystemTestCase,
  1369. module=sys.modules[__name__])
  1370. yield from qubes.tests.create_testcases_for_templates('VmIPv6Networking',
  1371. VmIPv6NetworkingMixin, qubes.tests.SystemTestCase,
  1372. module=sys.modules[__name__])
  1373. yield from qubes.tests.create_testcases_for_templates('VmUpdates',
  1374. VmUpdatesMixin, qubes.tests.SystemTestCase,
  1375. module=sys.modules[__name__])
  1376. def load_tests(loader, tests, pattern):
  1377. tests.addTests(loader.loadTestsFromNames(
  1378. create_testcases_for_templates()))
  1379. return tests
  1380. qubes.tests.maybe_create_testcases_on_import(create_testcases_for_templates)