network.py 66 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594
  1. #
  2. # The Qubes OS Project, https://www.qubes-os.org/
  3. #
  4. # Copyright (C) 2015
  5. # Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
  6. # Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
  7. #
  8. # This library is free software; you can redistribute it and/or
  9. # modify it under the terms of the GNU Lesser General Public
  10. # License as published by the Free Software Foundation; either
  11. # version 2.1 of the License, or (at your option) any later version.
  12. #
  13. # This library is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  16. # Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public
  19. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  20. #
  21. from distutils import spawn
  22. import asyncio
  23. import subprocess
  24. import sys
  25. import time
  26. import unittest
  27. import qubes.tests
  28. import qubes.firewall
  29. import qubes.vm.qubesvm
  30. import qubes.vm.appvm
  31. # noinspection PyAttributeOutsideInit,PyPep8Naming
  32. class VmNetworkingMixin(object):
  33. test_ip = '192.168.123.45'
  34. test_name = 'test.example.com'
  35. ping_cmd = 'ping -W 1 -n -c 1 {target}'
  36. ping_ip = ping_cmd.format(target=test_ip)
  37. ping_name = ping_cmd.format(target=test_name)
  38. # filled by load_tests
  39. template = None
  40. def run_cmd(self, vm, cmd, user="root"):
  41. '''Run a command *cmd* in a *vm* as *user*. Return its exit code.
  42. :type self: qubes.tests.SystemTestCase | VmNetworkingMixin
  43. :param qubes.vm.qubesvm.QubesVM vm: VM object to run command in
  44. :param str cmd: command to execute
  45. :param std user: user to execute command as
  46. :return int: command exit code
  47. '''
  48. try:
  49. self.loop.run_until_complete(vm.run_for_stdio(cmd, user=user))
  50. except subprocess.CalledProcessError as e:
  51. return e.returncode
  52. return 0
  53. def setUp(self):
  54. '''
  55. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  56. '''
  57. super(VmNetworkingMixin, self).setUp()
  58. if self.template.startswith('whonix-'):
  59. self.skipTest("Test not supported here - Whonix uses its own "
  60. "firewall settings")
  61. self.init_default_template(self.template)
  62. self.testnetvm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  63. name=self.make_vm_name('netvm1'),
  64. label='red')
  65. self.loop.run_until_complete(self.testnetvm.create_on_disk())
  66. self.testnetvm.provides_network = True
  67. self.testnetvm.netvm = None
  68. self.testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  69. name=self.make_vm_name('vm1'),
  70. label='red')
  71. self.loop.run_until_complete(self.testvm1.create_on_disk())
  72. self.testvm1.netvm = self.testnetvm
  73. self.app.save()
  74. self.configure_netvm()
  75. def configure_netvm(self):
  76. '''
  77. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  78. '''
  79. def run_netvm_cmd(cmd):
  80. if self.run_cmd(self.testnetvm, cmd) != 0:
  81. self.fail("Command '%s' failed" % cmd)
  82. if not self.testnetvm.is_running():
  83. self.loop.run_until_complete(self.testnetvm.start())
  84. # Ensure that dnsmasq is installed:
  85. try:
  86. self.loop.run_until_complete(self.testnetvm.run_for_stdio(
  87. 'dnsmasq --version', user='root'))
  88. except subprocess.CalledProcessError:
  89. self.skipTest("dnsmasq not installed")
  90. run_netvm_cmd("ip link add test0 type dummy")
  91. run_netvm_cmd("ip link set test0 up")
  92. run_netvm_cmd("ip addr add {}/24 dev test0".format(self.test_ip))
  93. run_netvm_cmd("iptables -I INPUT -d {} -j ACCEPT --wait".format(
  94. self.test_ip))
  95. # ignore failure
  96. self.run_cmd(self.testnetvm, "pkill dnsmasq")
  97. run_netvm_cmd("dnsmasq -a {ip} -A /{name}/{ip} -i test0 -z".format(
  98. ip=self.test_ip, name=self.test_name))
  99. run_netvm_cmd("echo nameserver {} > /etc/resolv.conf".format(
  100. self.test_ip))
  101. run_netvm_cmd("/usr/lib/qubes/qubes-setup-dnat-to-ns")
  102. def test_000_simple_networking(self):
  103. '''
  104. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  105. '''
  106. self.loop.run_until_complete(self.testvm1.start())
  107. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  108. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  109. def test_010_simple_proxyvm(self):
  110. '''
  111. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  112. '''
  113. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  114. name=self.make_vm_name('proxy'),
  115. label='red')
  116. self.proxy.provides_network = True
  117. self.proxy.netvm = self.testnetvm
  118. self.loop.run_until_complete(self.proxy.create_on_disk())
  119. self.testvm1.netvm = self.proxy
  120. self.app.save()
  121. self.loop.run_until_complete(self.testvm1.start())
  122. self.assertTrue(self.proxy.is_running())
  123. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
  124. "Ping by IP from ProxyVM failed")
  125. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
  126. "Ping by name from ProxyVM failed")
  127. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  128. "Ping by IP from AppVM failed")
  129. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  130. "Ping by IP from AppVM failed")
  131. @qubes.tests.expectedFailureIfTemplate('debian-7')
  132. @unittest.skipUnless(spawn.find_executable('xdotool'),
  133. "xdotool not installed")
  134. def test_020_simple_proxyvm_nm(self):
  135. '''
  136. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  137. '''
  138. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  139. name=self.make_vm_name('proxy'),
  140. label='red')
  141. self.proxy.provides_network = True
  142. self.loop.run_until_complete(self.proxy.create_on_disk())
  143. self.proxy.netvm = self.testnetvm
  144. self.proxy.features['service.network-manager'] = True
  145. self.testvm1.netvm = self.proxy
  146. self.app.save()
  147. self.loop.run_until_complete(self.testvm1.start())
  148. self.assertTrue(self.proxy.is_running())
  149. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  150. "Ping by IP failed")
  151. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  152. "Ping by name failed")
  153. # reconnect to make sure that device was configured by NM
  154. self.assertEqual(
  155. self.run_cmd(self.proxy, "nmcli device disconnect eth0",
  156. user="user"),
  157. 0, "Failed to disconnect eth0 using nmcli")
  158. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  159. "Network should be disabled, but apparently it isn't")
  160. self.assertEqual(
  161. self.run_cmd(self.proxy,
  162. 'nmcli connection up "VM uplink eth0" ifname eth0',
  163. user="user"),
  164. 0, "Failed to connect eth0 using nmcli")
  165. self.assertEqual(self.run_cmd(self.proxy, "nm-online", user="user"), 0,
  166. "Failed to wait for NM connection")
  167. # check for nm-applet presence
  168. self.assertEqual(subprocess.call([
  169. 'xdotool', 'search', '--class', '{}:nm-applet'.format(
  170. self.proxy.name)],
  171. stdout=subprocess.DEVNULL), 0, "nm-applet window not found")
  172. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  173. "Ping by IP failed (after NM reconnection")
  174. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  175. "Ping by name failed (after NM reconnection)")
  176. def test_030_firewallvm_firewall(self):
  177. '''
  178. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  179. '''
  180. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  181. name=self.make_vm_name('proxy'),
  182. label='red')
  183. self.proxy.provides_network = True
  184. self.loop.run_until_complete(self.proxy.create_on_disk())
  185. self.proxy.netvm = self.testnetvm
  186. self.testvm1.netvm = self.proxy
  187. self.app.save()
  188. # block all for first
  189. self.testvm1.firewall.rules = [qubes.firewall.Rule(action='drop')]
  190. self.testvm1.firewall.save()
  191. self.loop.run_until_complete(self.testvm1.start())
  192. self.assertTrue(self.proxy.is_running())
  193. server = self.loop.run_until_complete(self.testnetvm.run(
  194. 'socat TCP-LISTEN:1234,fork EXEC:/bin/uname'))
  195. try:
  196. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
  197. "Ping by IP from ProxyVM failed")
  198. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
  199. "Ping by name from ProxyVM failed")
  200. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  201. "Ping by IP should be blocked")
  202. client_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
  203. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  204. "TCP connection should be blocked")
  205. # block all except ICMP
  206. self.testvm1.firewall.rules = [(
  207. qubes.firewall.Rule(None, action='accept', proto='icmp')
  208. )]
  209. self.testvm1.firewall.save()
  210. # Ugly hack b/c there is no feedback when the rules are actually
  211. # applied
  212. time.sleep(3)
  213. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  214. "Ping by IP failed (should be allowed now)")
  215. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  216. "Ping by name should be blocked")
  217. # all TCP still blocked
  218. self.testvm1.firewall.rules = [
  219. qubes.firewall.Rule(None, action='accept', proto='icmp'),
  220. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  221. ]
  222. self.testvm1.firewall.save()
  223. # Ugly hack b/c there is no feedback when the rules are actually
  224. # applied
  225. time.sleep(3)
  226. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  227. "Ping by name failed (should be allowed now)")
  228. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  229. "TCP connection should be blocked")
  230. # block all except target
  231. self.testvm1.firewall.rules = [
  232. qubes.firewall.Rule(None, action='accept', dsthost=self.test_ip,
  233. proto='tcp', dstports=1234),
  234. ]
  235. self.testvm1.firewall.save()
  236. # Ugly hack b/c there is no feedback when the rules are actually
  237. # applied
  238. time.sleep(3)
  239. self.assertEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  240. "TCP connection failed (should be allowed now)")
  241. # allow all except target
  242. self.testvm1.firewall.rules = [
  243. qubes.firewall.Rule(None, action='drop', dsthost=self.test_ip,
  244. proto='tcp', dstports=1234),
  245. qubes.firewall.Rule(action='accept'),
  246. ]
  247. self.testvm1.firewall.save()
  248. # Ugly hack b/c there is no feedback when the rules are actually
  249. # applied
  250. time.sleep(3)
  251. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  252. "TCP connection should be blocked")
  253. finally:
  254. server.terminate()
  255. self.loop.run_until_complete(server.wait())
  256. def test_040_inter_vm(self):
  257. '''
  258. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  259. '''
  260. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  261. name=self.make_vm_name('proxy'),
  262. label='red')
  263. self.loop.run_until_complete(self.proxy.create_on_disk())
  264. self.proxy.provides_network = True
  265. self.proxy.netvm = self.testnetvm
  266. self.testvm1.netvm = self.proxy
  267. self.testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  268. name=self.make_vm_name('vm2'),
  269. label='red')
  270. self.loop.run_until_complete(self.testvm2.create_on_disk())
  271. self.testvm2.netvm = self.proxy
  272. self.app.save()
  273. self.loop.run_until_complete(asyncio.wait([
  274. self.testvm1.start(),
  275. self.testvm2.start()]))
  276. self.assertNotEqual(self.run_cmd(self.testvm1,
  277. self.ping_cmd.format(target=self.testvm2.ip)), 0)
  278. self.testvm2.netvm = self.testnetvm
  279. self.assertNotEqual(self.run_cmd(self.testvm1,
  280. self.ping_cmd.format(target=self.testvm2.ip)), 0)
  281. self.assertNotEqual(self.run_cmd(self.testvm2,
  282. self.ping_cmd.format(target=self.testvm1.ip)), 0)
  283. self.testvm1.netvm = self.testnetvm
  284. self.assertNotEqual(self.run_cmd(self.testvm1,
  285. self.ping_cmd.format(target=self.testvm2.ip)), 0)
  286. self.assertNotEqual(self.run_cmd(self.testvm2,
  287. self.ping_cmd.format(target=self.testvm1.ip)), 0)
  288. def test_050_spoof_ip(self):
  289. '''Test if VM IP spoofing is blocked
  290. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  291. '''
  292. self.loop.run_until_complete(self.testvm1.start())
  293. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  294. self.assertEqual(self.run_cmd(self.testnetvm,
  295. 'iptables -I INPUT -i vif+ ! -s {} -p icmp -j LOG'.format(
  296. self.testvm1.ip)), 0)
  297. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  298. 'ip addr flush dev eth0 && '
  299. 'ip addr add 10.137.1.128/24 dev eth0 && '
  300. 'ip route add default dev eth0',
  301. user='root'))
  302. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  303. "Spoofed ping should be blocked")
  304. try:
  305. (output, _) = self.loop.run_until_complete(
  306. self.testnetvm.run_for_stdio('iptables -nxvL INPUT',
  307. user='root'))
  308. except subprocess.CalledProcessError:
  309. self.fail('iptables -nxvL INPUT failed')
  310. output = output.decode().splitlines()
  311. packets = output[2].lstrip().split()[0]
  312. self.assertEquals(packets, '0', 'Some packet hit the INPUT rule')
  313. def test_100_late_xldevd_startup(self):
  314. '''Regression test for #1990
  315. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  316. '''
  317. # Simulater late xl devd startup
  318. cmd = "systemctl stop xendriverdomain"
  319. if self.run_cmd(self.testnetvm, cmd) != 0:
  320. self.fail("Command '%s' failed" % cmd)
  321. self.loop.run_until_complete(self.testvm1.start())
  322. cmd = "systemctl start xendriverdomain"
  323. if self.run_cmd(self.testnetvm, cmd) != 0:
  324. self.fail("Command '%s' failed" % cmd)
  325. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  326. def test_110_dynamic_attach(self):
  327. self.testvm1.netvm = None
  328. self.loop.run_until_complete(self.testvm1.start())
  329. self.testvm1.netvm = self.testnetvm
  330. # wait for it to settle down
  331. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  332. 'udevadm settle'))
  333. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  334. def test_111_dynamic_detach_attach(self):
  335. self.loop.run_until_complete(self.testvm1.start())
  336. self.testvm1.netvm = None
  337. # wait for it to settle down
  338. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  339. 'udevadm settle'))
  340. self.testvm1.netvm = self.testnetvm
  341. # wait for it to settle down
  342. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  343. 'udevadm settle'))
  344. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  345. def test_112_reattach_after_provider_shutdown(self):
  346. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  347. name=self.make_vm_name('proxy'),
  348. label='red')
  349. self.proxy.provides_network = True
  350. self.proxy.netvm = self.testnetvm
  351. self.loop.run_until_complete(self.proxy.create_on_disk())
  352. self.testvm1.netvm = self.proxy
  353. self.loop.run_until_complete(self.testvm1.start())
  354. self.loop.run_until_complete(self.proxy.shutdown(force=True, wait=True))
  355. self.loop.run_until_complete(self.proxy.start())
  356. # wait for it to settle down
  357. self.loop.run_until_complete(asyncio.sleep(5))
  358. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  359. def test_113_reattach_after_provider_kill(self):
  360. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  361. name=self.make_vm_name('proxy'),
  362. label='red')
  363. self.proxy.provides_network = True
  364. self.proxy.netvm = self.testnetvm
  365. self.loop.run_until_complete(self.proxy.create_on_disk())
  366. self.testvm1.netvm = self.proxy
  367. self.loop.run_until_complete(self.testvm1.start())
  368. self.loop.run_until_complete(self.proxy.kill())
  369. self.loop.run_until_complete(self.proxy.start())
  370. # wait for it to settle down
  371. self.loop.run_until_complete(asyncio.sleep(5))
  372. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  373. def test_114_reattach_after_provider_crash(self):
  374. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  375. name=self.make_vm_name('proxy'),
  376. label='red')
  377. self.proxy.provides_network = True
  378. self.proxy.netvm = self.testnetvm
  379. self.loop.run_until_complete(self.proxy.create_on_disk())
  380. self.testvm1.netvm = self.proxy
  381. self.loop.run_until_complete(self.testvm1.start())
  382. p = self.loop.run_until_complete(self.proxy.run(
  383. 'echo c > /proc/sysrq-trigger', user='root'))
  384. self.loop.run_until_complete(p.wait())
  385. timeout = 10
  386. while self.proxy.is_running():
  387. self.loop.run_until_complete(asyncio.sleep(1))
  388. timeout -= 1
  389. self.assertGreater(timeout, 0,
  390. 'timeout waiting for crash cleanup')
  391. self.loop.run_until_complete(self.proxy.start())
  392. # wait for it to settle down
  393. self.loop.run_until_complete(asyncio.sleep(5))
  394. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  395. def test_200_fake_ip_simple(self):
  396. '''Test hiding VM real IP
  397. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  398. '''
  399. self.testvm1.features['net.fake-ip'] = '192.168.1.128'
  400. self.testvm1.features['net.fake-gateway'] = '192.168.1.1'
  401. self.testvm1.features['net.fake-netmask'] = '255.255.255.0'
  402. self.app.save()
  403. self.loop.run_until_complete(self.testvm1.start())
  404. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  405. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  406. try:
  407. (output, _) = self.loop.run_until_complete(
  408. self.testvm1.run_for_stdio(
  409. 'ip addr show dev eth0', user='root'))
  410. except subprocess.CalledProcessError:
  411. self.fail('ip addr show dev eth0 failed')
  412. output = output.decode()
  413. self.assertIn('192.168.1.128', output)
  414. self.assertNotIn(str(self.testvm1.ip), output)
  415. try:
  416. (output, _) = self.loop.run_until_complete(
  417. self.testvm1.run_for_stdio('ip route show', user='root'))
  418. except subprocess.CalledProcessError:
  419. self.fail('ip route show failed')
  420. output = output.decode()
  421. self.assertIn('192.168.1.1', output)
  422. self.assertNotIn(str(self.testvm1.netvm.ip), output)
  423. def test_201_fake_ip_without_gw(self):
  424. '''Test hiding VM real IP
  425. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  426. '''
  427. self.testvm1.features['net.fake-ip'] = '192.168.1.128'
  428. self.app.save()
  429. self.loop.run_until_complete(self.testvm1.start())
  430. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  431. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  432. try:
  433. (output, _) = self.loop.run_until_complete(
  434. self.testvm1.run_for_stdio('ip addr show dev eth0',
  435. user='root'))
  436. except subprocess.CalledProcessError:
  437. self.fail('ip addr show dev eth0 failed')
  438. output = output.decode()
  439. self.assertIn('192.168.1.128', output)
  440. self.assertNotIn(str(self.testvm1.ip), output)
  441. def test_202_fake_ip_firewall(self):
  442. '''Test hiding VM real IP, firewall
  443. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  444. '''
  445. self.testvm1.features['net.fake-ip'] = '192.168.1.128'
  446. self.testvm1.features['net.fake-gateway'] = '192.168.1.1'
  447. self.testvm1.features['net.fake-netmask'] = '255.255.255.0'
  448. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  449. name=self.make_vm_name('proxy'),
  450. label='red')
  451. self.proxy.provides_network = True
  452. self.loop.run_until_complete(self.proxy.create_on_disk())
  453. self.proxy.netvm = self.testnetvm
  454. self.testvm1.netvm = self.proxy
  455. self.app.save()
  456. # block all but ICMP and DNS
  457. self.testvm1.firewall.rules = [
  458. qubes.firewall.Rule(None, action='accept', proto='icmp'),
  459. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  460. ]
  461. self.testvm1.firewall.save()
  462. self.loop.run_until_complete(self.testvm1.start())
  463. self.assertTrue(self.proxy.is_running())
  464. server = self.loop.run_until_complete(self.testnetvm.run(
  465. 'socat TCP-LISTEN:1234,fork EXEC:/bin/uname'))
  466. try:
  467. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
  468. "Ping by IP from ProxyVM failed")
  469. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
  470. "Ping by name from ProxyVM failed")
  471. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  472. "Ping by IP should be allowed")
  473. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  474. "Ping by name should be allowed")
  475. client_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
  476. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  477. "TCP connection should be blocked")
  478. finally:
  479. server.terminate()
  480. self.loop.run_until_complete(server.wait())
  481. def test_203_fake_ip_inter_vm_allow(self):
  482. '''Access VM with "fake IP" from other VM (when firewall allows)
  483. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  484. '''
  485. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  486. name=self.make_vm_name('proxy'),
  487. label='red')
  488. self.loop.run_until_complete(self.proxy.create_on_disk())
  489. self.proxy.provides_network = True
  490. self.proxy.netvm = self.testnetvm
  491. self.testvm1.netvm = self.proxy
  492. self.testvm1.features['net.fake-ip'] = '192.168.1.128'
  493. self.testvm1.features['net.fake-gateway'] = '192.168.1.1'
  494. self.testvm1.features['net.fake-netmask'] = '255.255.255.0'
  495. self.testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  496. name=self.make_vm_name('vm2'),
  497. label='red')
  498. self.loop.run_until_complete(self.testvm2.create_on_disk())
  499. self.testvm2.netvm = self.proxy
  500. self.app.save()
  501. self.loop.run_until_complete(self.testvm1.start())
  502. self.loop.run_until_complete(self.testvm2.start())
  503. cmd = 'iptables -I FORWARD -s {} -d {} -j ACCEPT'.format(
  504. self.testvm2.ip, self.testvm1.ip)
  505. try:
  506. self.loop.run_until_complete(self.proxy.run_for_stdio(
  507. cmd, user='root'))
  508. except subprocess.CalledProcessError as e:
  509. raise AssertionError(
  510. '{} failed with: {}'.format(cmd, e.returncode)) from None
  511. try:
  512. cmd = 'iptables -I INPUT -s {} -j ACCEPT'.format(self.testvm2.ip)
  513. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  514. cmd, user='root'))
  515. except subprocess.CalledProcessError as e:
  516. raise AssertionError(
  517. '{} failed with: {}'.format(cmd, e.returncode)) from None
  518. self.assertEqual(self.run_cmd(self.testvm2,
  519. self.ping_cmd.format(target=self.testvm1.ip)), 0)
  520. try:
  521. cmd = 'iptables -nvxL INPUT | grep {}'.format(self.testvm2.ip)
  522. (stdout, _) = self.loop.run_until_complete(
  523. self.testvm1.run_for_stdio(cmd, user='root'))
  524. except subprocess.CalledProcessError as e:
  525. raise AssertionError(
  526. '{} failed with {}'.format(cmd, e.returncode)) from None
  527. self.assertNotEqual(stdout.decode().split()[0], '0',
  528. 'Packets didn\'t managed to the VM')
  529. def test_204_fake_ip_proxy(self):
  530. '''Test hiding VM real IP
  531. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  532. '''
  533. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  534. name=self.make_vm_name('proxy'),
  535. label='red')
  536. self.loop.run_until_complete(self.proxy.create_on_disk())
  537. self.proxy.provides_network = True
  538. self.proxy.netvm = self.testnetvm
  539. self.proxy.features['net.fake-ip'] = '192.168.1.128'
  540. self.proxy.features['net.fake-gateway'] = '192.168.1.1'
  541. self.proxy.features['net.fake-netmask'] = '255.255.255.0'
  542. self.testvm1.netvm = self.proxy
  543. self.app.save()
  544. self.loop.run_until_complete(self.testvm1.start())
  545. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0)
  546. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0)
  547. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  548. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  549. try:
  550. (output, _) = self.loop.run_until_complete(
  551. self.proxy.run_for_stdio(
  552. 'ip addr show dev eth0', user='root'))
  553. except subprocess.CalledProcessError:
  554. self.fail('ip addr show dev eth0 failed')
  555. output = output.decode()
  556. self.assertIn('192.168.1.128', output)
  557. self.assertNotIn(str(self.testvm1.ip), output)
  558. try:
  559. (output, _) = self.loop.run_until_complete(
  560. self.proxy.run_for_stdio(
  561. 'ip route show', user='root'))
  562. except subprocess.CalledProcessError:
  563. self.fail('ip route show failed')
  564. output = output.decode()
  565. self.assertIn('192.168.1.1', output)
  566. self.assertNotIn(str(self.testvm1.netvm.ip), output)
  567. try:
  568. (output, _) = self.loop.run_until_complete(
  569. self.testvm1.run_for_stdio(
  570. 'ip addr show dev eth0', user='root'))
  571. except subprocess.CalledProcessError:
  572. self.fail('ip addr show dev eth0 failed')
  573. output = output.decode()
  574. self.assertNotIn('192.168.1.128', output)
  575. self.assertIn(str(self.testvm1.ip), output)
  576. try:
  577. (output, _) = self.loop.run_until_complete(
  578. self.testvm1.run_for_stdio(
  579. 'ip route show', user='root'))
  580. except subprocess.CalledProcessError:
  581. self.fail('ip route show failed')
  582. output = output.decode()
  583. self.assertIn('192.168.1.128', output)
  584. self.assertNotIn(str(self.proxy.ip), output)
  585. def test_210_custom_ip_simple(self):
  586. '''Custom AppVM IP
  587. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  588. '''
  589. self.testvm1.ip = '192.168.1.1'
  590. self.app.save()
  591. self.loop.run_until_complete(self.testvm1.start())
  592. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  593. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  594. def test_211_custom_ip_proxy(self):
  595. '''Custom ProxyVM IP
  596. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  597. '''
  598. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  599. name=self.make_vm_name('proxy'),
  600. label='red')
  601. self.loop.run_until_complete(self.proxy.create_on_disk())
  602. self.proxy.provides_network = True
  603. self.proxy.netvm = self.testnetvm
  604. self.proxy.ip = '192.168.1.1'
  605. self.testvm1.netvm = self.proxy
  606. self.app.save()
  607. self.loop.run_until_complete(self.testvm1.start())
  608. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  609. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  610. def test_212_custom_ip_firewall(self):
  611. '''Custom VM IP and firewall
  612. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  613. '''
  614. self.testvm1.ip = '192.168.1.1'
  615. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  616. name=self.make_vm_name('proxy'),
  617. label='red')
  618. self.proxy.provides_network = True
  619. self.loop.run_until_complete(self.proxy.create_on_disk())
  620. self.proxy.netvm = self.testnetvm
  621. self.testvm1.netvm = self.proxy
  622. self.app.save()
  623. # block all but ICMP and DNS
  624. self.testvm1.firewall.rules = [
  625. qubes.firewall.Rule(None, action='accept', proto='icmp'),
  626. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  627. ]
  628. self.testvm1.firewall.save()
  629. self.loop.run_until_complete(self.testvm1.start())
  630. self.assertTrue(self.proxy.is_running())
  631. server = self.loop.run_until_complete(self.testnetvm.run(
  632. 'socat TCP-LISTEN:1234,fork EXEC:/bin/uname'))
  633. try:
  634. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
  635. "Ping by IP from ProxyVM failed")
  636. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
  637. "Ping by name from ProxyVM failed")
  638. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  639. "Ping by IP should be allowed")
  640. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  641. "Ping by name should be allowed")
  642. client_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
  643. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  644. "TCP connection should be blocked")
  645. finally:
  646. server.terminate()
  647. self.loop.run_until_complete(server.wait())
  648. # noinspection PyAttributeOutsideInit,PyPep8Naming
  649. class VmIPv6NetworkingMixin(VmNetworkingMixin):
  650. test_ip6 = '2000:abcd::1'
  651. ping6_cmd = 'ping6 -W 1 -n -c 1 {target}'
  652. def setUp(self):
  653. super(VmIPv6NetworkingMixin, self).setUp()
  654. self.ping6_ip = self.ping6_cmd.format(target=self.test_ip6)
  655. self.ping6_name = self.ping6_cmd.format(target=self.test_name)
  656. def configure_netvm(self):
  657. '''
  658. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  659. '''
  660. self.testnetvm.features['ipv6'] = True
  661. super(VmIPv6NetworkingMixin, self).configure_netvm()
  662. def run_netvm_cmd(cmd):
  663. if self.run_cmd(self.testnetvm, cmd) != 0:
  664. self.fail("Command '%s' failed" % cmd)
  665. run_netvm_cmd("ip addr add {}/128 dev test0".format(self.test_ip6))
  666. run_netvm_cmd(
  667. "ip6tables -I INPUT -d {} -j ACCEPT".format(self.test_ip6))
  668. # ignore failure
  669. self.run_cmd(self.testnetvm, "pkill dnsmasq")
  670. run_netvm_cmd(
  671. "dnsmasq -a {ip} -A /{name}/{ip} -A /{name}/{ip6} -i test0 -z".
  672. format(ip=self.test_ip, ip6=self.test_ip6, name=self.test_name))
  673. def test_500_ipv6_simple_networking(self):
  674. '''
  675. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  676. '''
  677. self.loop.run_until_complete(self.testvm1.start())
  678. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0)
  679. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0)
  680. def test_510_ipv6_simple_proxyvm(self):
  681. '''
  682. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  683. '''
  684. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  685. name=self.make_vm_name('proxy'),
  686. label='red')
  687. self.proxy.provides_network = True
  688. self.proxy.netvm = self.testnetvm
  689. self.loop.run_until_complete(self.proxy.create_on_disk())
  690. self.testvm1.netvm = self.proxy
  691. self.app.save()
  692. self.loop.run_until_complete(self.testvm1.start())
  693. self.assertTrue(self.proxy.is_running())
  694. self.assertEqual(self.run_cmd(self.proxy, self.ping6_ip), 0,
  695. "Ping by IP from ProxyVM failed")
  696. self.assertEqual(self.run_cmd(self.proxy, self.ping6_name), 0,
  697. "Ping by name from ProxyVM failed")
  698. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  699. "Ping by IP from AppVM failed")
  700. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
  701. "Ping by IP from AppVM failed")
  702. @qubes.tests.expectedFailureIfTemplate('debian-7')
  703. @unittest.skipUnless(spawn.find_executable('xdotool'),
  704. "xdotool not installed")
  705. def test_520_ipv6_simple_proxyvm_nm(self):
  706. '''
  707. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  708. '''
  709. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  710. name=self.make_vm_name('proxy'),
  711. label='red')
  712. self.proxy.provides_network = True
  713. self.loop.run_until_complete(self.proxy.create_on_disk())
  714. self.proxy.netvm = self.testnetvm
  715. self.proxy.features['service.network-manager'] = True
  716. self.testvm1.netvm = self.proxy
  717. self.app.save()
  718. self.loop.run_until_complete(self.testvm1.start())
  719. self.assertTrue(self.proxy.is_running())
  720. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  721. "Ping by IP failed")
  722. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
  723. "Ping by name failed")
  724. # reconnect to make sure that device was configured by NM
  725. self.assertEqual(
  726. self.run_cmd(self.proxy, "nmcli device disconnect eth0",
  727. user="user"),
  728. 0, "Failed to disconnect eth0 using nmcli")
  729. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  730. "Network should be disabled, but apparently it isn't")
  731. self.assertEqual(
  732. self.run_cmd(self.proxy,
  733. 'nmcli connection up "VM uplink eth0" ifname eth0',
  734. user="user"),
  735. 0, "Failed to connect eth0 using nmcli")
  736. self.assertEqual(self.run_cmd(self.proxy, "nm-online",
  737. user="user"), 0,
  738. "Failed to wait for NM connection")
  739. # wait for duplicate-address-detection to complete - by default it has
  740. # 1s timeout
  741. time.sleep(2)
  742. # check for nm-applet presence
  743. self.assertEqual(subprocess.call([
  744. 'xdotool', 'search', '--class', '{}:nm-applet'.format(
  745. self.proxy.name)],
  746. stdout=subprocess.DEVNULL), 0, "nm-applet window not found")
  747. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  748. "Ping by IP failed (after NM reconnection")
  749. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
  750. "Ping by name failed (after NM reconnection)")
  751. def test_530_ipv6_firewallvm_firewall(self):
  752. '''
  753. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  754. '''
  755. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  756. name=self.make_vm_name('proxy'),
  757. label='red')
  758. self.proxy.provides_network = True
  759. self.loop.run_until_complete(self.proxy.create_on_disk())
  760. self.proxy.netvm = self.testnetvm
  761. self.testvm1.netvm = self.proxy
  762. self.app.save()
  763. # block all for first
  764. self.testvm1.firewall.rules = [qubes.firewall.Rule(action='drop')]
  765. self.testvm1.firewall.save()
  766. self.loop.run_until_complete(self.testvm1.start())
  767. self.assertTrue(self.proxy.is_running())
  768. server = self.loop.run_until_complete(self.testnetvm.run(
  769. 'socat TCP6-LISTEN:1234,fork EXEC:/bin/uname'))
  770. try:
  771. self.assertEqual(self.run_cmd(self.proxy, self.ping6_ip), 0,
  772. "Ping by IP from ProxyVM failed")
  773. self.assertEqual(self.run_cmd(self.proxy, self.ping6_name), 0,
  774. "Ping by name from ProxyVM failed")
  775. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  776. "Ping by IP should be blocked")
  777. client6_cmd = "socat TCP:[{}]:1234 -".format(self.test_ip6)
  778. client4_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
  779. self.assertNotEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
  780. "TCP connection should be blocked")
  781. # block all except ICMP
  782. self.testvm1.firewall.rules = [(
  783. qubes.firewall.Rule(None, action='accept', proto='icmp')
  784. )]
  785. self.testvm1.firewall.save()
  786. # Ugly hack b/c there is no feedback when the rules are actually
  787. # applied
  788. time.sleep(3)
  789. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  790. "Ping by IP failed (should be allowed now)")
  791. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
  792. "Ping by name should be blocked")
  793. # all TCP still blocked
  794. self.testvm1.firewall.rules = [
  795. qubes.firewall.Rule(None, action='accept', proto='icmp'),
  796. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  797. ]
  798. self.testvm1.firewall.save()
  799. # Ugly hack b/c there is no feedback when the rules are actually
  800. # applied
  801. time.sleep(3)
  802. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
  803. "Ping by name failed (should be allowed now)")
  804. self.assertNotEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
  805. "TCP connection should be blocked")
  806. # block all except target
  807. self.testvm1.firewall.rules = [
  808. qubes.firewall.Rule(None, action='accept',
  809. dsthost=self.test_ip6,
  810. proto='tcp', dstports=1234),
  811. ]
  812. self.testvm1.firewall.save()
  813. # Ugly hack b/c there is no feedback when the rules are actually
  814. # applied
  815. time.sleep(3)
  816. self.assertEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
  817. "TCP connection failed (should be allowed now)")
  818. # block all except target - by name
  819. self.testvm1.firewall.rules = [
  820. qubes.firewall.Rule(None, action='accept',
  821. dsthost=self.test_name,
  822. proto='tcp', dstports=1234),
  823. ]
  824. self.testvm1.firewall.save()
  825. # Ugly hack b/c there is no feedback when the rules are actually
  826. # applied
  827. time.sleep(3)
  828. self.assertEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
  829. "TCP (IPv6) connection failed (should be allowed now)")
  830. self.assertEqual(self.run_cmd(self.testvm1, client4_cmd),
  831. 0,
  832. "TCP (IPv4) connection failed (should be allowed now)")
  833. # allow all except target
  834. self.testvm1.firewall.rules = [
  835. qubes.firewall.Rule(None, action='drop', dsthost=self.test_ip6,
  836. proto='tcp', dstports=1234),
  837. qubes.firewall.Rule(action='accept'),
  838. ]
  839. self.testvm1.firewall.save()
  840. # Ugly hack b/c there is no feedback when the rules are actually
  841. # applied
  842. time.sleep(3)
  843. self.assertNotEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
  844. "TCP connection should be blocked")
  845. finally:
  846. server.terminate()
  847. self.loop.run_until_complete(server.wait())
  848. def test_540_ipv6_inter_vm(self):
  849. '''
  850. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  851. '''
  852. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  853. name=self.make_vm_name('proxy'),
  854. label='red')
  855. self.loop.run_until_complete(self.proxy.create_on_disk())
  856. self.proxy.provides_network = True
  857. self.proxy.netvm = self.testnetvm
  858. self.testvm1.netvm = self.proxy
  859. self.testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  860. name=self.make_vm_name('vm2'),
  861. label='red')
  862. self.loop.run_until_complete(self.testvm2.create_on_disk())
  863. self.testvm2.netvm = self.proxy
  864. self.app.save()
  865. self.loop.run_until_complete(asyncio.wait([
  866. self.testvm1.start(),
  867. self.testvm2.start()]))
  868. self.assertNotEqual(self.run_cmd(self.testvm1,
  869. self.ping_cmd.format(target=self.testvm2.ip6)), 0)
  870. self.testvm2.netvm = self.testnetvm
  871. self.assertNotEqual(self.run_cmd(self.testvm1,
  872. self.ping_cmd.format(target=self.testvm2.ip6)), 0)
  873. self.assertNotEqual(self.run_cmd(self.testvm2,
  874. self.ping_cmd.format(target=self.testvm1.ip6)), 0)
  875. self.testvm1.netvm = self.testnetvm
  876. self.assertNotEqual(self.run_cmd(self.testvm1,
  877. self.ping_cmd.format(target=self.testvm2.ip6)), 0)
  878. self.assertNotEqual(self.run_cmd(self.testvm2,
  879. self.ping_cmd.format(target=self.testvm1.ip6)), 0)
  880. def test_550_ipv6_spoof_ip(self):
  881. '''Test if VM IP spoofing is blocked
  882. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  883. '''
  884. self.loop.run_until_complete(self.testvm1.start())
  885. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0)
  886. # add a simple rule counting packets
  887. self.assertEqual(self.run_cmd(self.testnetvm,
  888. 'ip6tables -I INPUT -i vif+ ! -s {} -p icmpv6 -j LOG'.format(
  889. self.testvm1.ip6)), 0)
  890. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  891. 'ip -6 addr flush dev eth0 && '
  892. 'ip -6 addr add {}/128 dev eth0 && '
  893. 'ip -6 route add default via {} dev eth0'.format(
  894. str(self.testvm1.visible_ip6) + '1',
  895. str(self.testvm1.visible_gateway6)),
  896. user='root'))
  897. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  898. "Spoofed ping should be blocked")
  899. try:
  900. (output, _) = self.loop.run_until_complete(
  901. self.testnetvm.run_for_stdio('ip6tables -nxvL INPUT',
  902. user='root'))
  903. except subprocess.CalledProcessError:
  904. self.fail('ip6tables -nxvL INPUT failed')
  905. output = output.decode().splitlines()
  906. packets = output[2].lstrip().split()[0]
  907. self.assertEquals(packets, '0', 'Some packet hit the INPUT rule')
  908. def test_710_ipv6_custom_ip_simple(self):
  909. '''Custom AppVM IP
  910. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  911. '''
  912. self.testvm1.ip6 = '2000:aaaa:bbbb::1'
  913. self.app.save()
  914. self.loop.run_until_complete(self.testvm1.start())
  915. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0)
  916. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0)
  917. def test_711_ipv6_custom_ip_proxy(self):
  918. '''Custom ProxyVM IP
  919. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  920. '''
  921. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  922. name=self.make_vm_name('proxy'),
  923. label='red')
  924. self.loop.run_until_complete(self.proxy.create_on_disk())
  925. self.proxy.provides_network = True
  926. self.proxy.netvm = self.testnetvm
  927. self.testvm1.ip6 = '2000:aaaa:bbbb::1'
  928. self.testvm1.netvm = self.proxy
  929. self.app.save()
  930. self.loop.run_until_complete(self.testvm1.start())
  931. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0)
  932. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0)
  933. def test_712_ipv6_custom_ip_firewall(self):
  934. '''Custom VM IP and firewall
  935. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  936. '''
  937. self.testvm1.ip6 = '2000:aaaa:bbbb::1'
  938. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  939. name=self.make_vm_name('proxy'),
  940. label='red')
  941. self.proxy.provides_network = True
  942. self.loop.run_until_complete(self.proxy.create_on_disk())
  943. self.proxy.netvm = self.testnetvm
  944. self.testvm1.netvm = self.proxy
  945. self.app.save()
  946. # block all but ICMP and DNS
  947. self.testvm1.firewall.rules = [
  948. qubes.firewall.Rule(None, action='accept', proto='icmp'),
  949. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  950. ]
  951. self.testvm1.firewall.save()
  952. self.loop.run_until_complete(self.testvm1.start())
  953. self.assertTrue(self.proxy.is_running())
  954. server = self.loop.run_until_complete(self.testnetvm.run(
  955. 'socat TCP6-LISTEN:1234,fork EXEC:/bin/uname'))
  956. try:
  957. self.assertEqual(self.run_cmd(self.proxy, self.ping6_ip), 0,
  958. "Ping by IP from ProxyVM failed")
  959. self.assertEqual(self.run_cmd(self.proxy, self.ping6_name), 0,
  960. "Ping by name from ProxyVM failed")
  961. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  962. "Ping by IP should be allowed")
  963. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
  964. "Ping by name should be allowed")
  965. client_cmd = "socat TCP:[{}]:1234 -".format(self.test_ip6)
  966. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  967. "TCP connection should be blocked")
  968. finally:
  969. server.terminate()
  970. self.loop.run_until_complete(server.wait())
  971. # noinspection PyAttributeOutsideInit,PyPep8Naming
  972. class VmUpdatesMixin(object):
  973. """
  974. Tests for VM updates
  975. """
  976. # filled by load_tests
  977. template = None
  978. # made this way to work also when no package build tools are installed
  979. """
  980. $ cat test-pkg.spec:
  981. Name: test-pkg
  982. Version: 1.0
  983. Release: 1%{?dist}
  984. Summary: Test package
  985. Group: System
  986. License: GPL
  987. URL: http://example.com/
  988. %description
  989. Test package
  990. %files
  991. %changelog
  992. $ rpmbuild -bb test-pkg.spec
  993. $ cat test-pkg-1.0-1.fc21.x86_64.rpm | gzip | base64
  994. $ cat test-pkg-1.1-1.fc21.x86_64.rpm | gzip | base64
  995. """
  996. RPM_PACKAGE_GZIP_BASE64 = [(
  997. b"H4sIAPzRLlYAA+2Y728URRjHn7ueUCkERKJVJDnTxLSxs7293o8WOER6ljYYrtKCLUSa3"
  998. b"bnZ64bd22VmTq8nr4wJbwxvjNHIG0x8oTHGGCHB8AcYE1/0lS80GgmQFCJU3wgB4ZjdfZ"
  999. b"q2xDe8NNlvMjfzmeeZH7tPbl98b35169cOUEpIJiTxT9SIrmVUs2hWh8dUAp54dOrM14s"
  1000. b"JHK4D2DKl+j2qrVfjsuq3qEWbohjuAB2Lqk+p1o/8Z5QPmSi/YwnjezH+F8bLQZjqllW0"
  1001. b"hvODRmFIL5hFk9JMXi/mi5ZuDleNwSEzP5wtmLnouNQnm3/6fndz7FLt9M/Hruj37gav4"
  1002. b"tTjPnasWLFixYoVK1asWLFixYoV63+p0KNot9vnIPQc1vgYOwCSgXfxCoS+QzKHOVXVOj"
  1003. b"Fn2ccIfI0k8nXkLuQbyJthxed4UrVnkG8i9yDfgsj3yCAv4foc8t+w1hf5B+Nl5Du43xj"
  1004. b"yvxivIN9HpsgPkO2IU9uQfeRn8Xk/iJ4x1Y3nfxH1qecwfhH5+YgT25F7o/0SRdxvOppP"
  1005. b"7MX9ZjB/DNnE/OOYX404uRGZIT+FbCFvQ3aQ8f0+/WF0XjJ8nyOw7H+BrmUA/a8pNZf2D"
  1006. b"XrCqLG1cERbWHI8ajhznpBY9P0Tr8PkvJDMhTkp/Z0DA6xpuL7DNOq5A+DY9UYTmkOF2U"
  1007. b"IO/sNt0wSnGvfdlZssD3rVIlLI9UUX37C6qXzHNntHPNfnTAhWHbUddtBwmegDjAUzZbu"
  1008. b"m9lqZmzDmHc8Ik8WY8Tab4Myym4+Gx8V0qw8GtYyWIzrktEJwV9UHv3ktG471rAqHTmFQ"
  1009. b"685V5uGqIalk06SWJr7tszR503Ac9cs493jJ8rhrSCIYbXBbzqt5v5+UZ0crh6bGR2dmJ"
  1010. b"yuHD428VlLLLdakzJe2VxcKhFSFID73JKPS40RI7tXVCcQ3uOGWhPCJ2bAspiJ2i5Vy6n"
  1011. b"jOqMerpEYpEe/Yks4xkU4Tt6BirmzUWanG6ozbFKhve9BsQRaLRTirzqk7hgUktXojKnf"
  1012. b"n8jeg3X4QepP3i63po6oml+9t/CwJLya2Bn/ei6f7/4B3Ycdb0L3pt5Q5mNz16rWJ9fLk"
  1013. b"vvOff/nxS7//8O2P2gvt7nDDnoV9L1du9N4+ucjl9u/8+a7dC5Nnvjlv9Ox5r+v9Cy0NE"
  1014. b"m+c6rv60S/dZw98Gn6MNswcfQiWUvg3wBUAAA=="
  1015. ), (
  1016. b"H4sIAMY1B1wCA+2Y72scRRjH537U1MZorKLRVjgJSIKdvf11u3dq0jZJ0wRLL+1VvBRrn"
  1017. b"J2dvVu6t7vd3bN3sS9ECr6RIkgR9JXQF5aiIk1L/gJF8EVe+KqiKLQQi03tmypojXO3zz"
  1018. b"Vp8YW+3y/MPvOZ55lnZthhXjw3Lqx9n0FcqYiFEfaP17AkSLxZVJbQ/1QKbbl/6Mxnqyn"
  1019. b"o9iE0uMTtOPTPcTvIJw1w+8DdDCj1KPBozJlVbrO8OcC/xvORH8/P3AT/2+D/DfynuTtH"
  1020. b"FKtANKNo6KpKJIs3jSkl3VIkvSAWSiZTlCL3akhXZCKKKmPcaRRJqURFS2MlSTMsgyqMz"
  1021. b"6RUp8SQFcmixZJpMlEWi0w1da3Eu3p3+1uW1saPHfpWOSvNXtruDVx4+A0+eAolSpQoUa"
  1022. b"JEiRIlSpQoUaJEiRJBTWR9ff191K1p3FM3ySGUEbndjbp1jUwOYkzetkJMr07SqZukgX8"
  1023. b"B7ge+DvwI2qijPMjbE8A3gIeB11BcVxGBb8J8FfgW+PcA3wb/FPAfkG8G+C/wl4HvAFPg"
  1024. b"v4HtmLOPA/vAT8J534vPmB2C9T+NbfYp8C8DPx1zagfwSJwvpUO+ajye2gP55iF+BtiA+"
  1025. b"Nch3ow5/TkwA74IbAFfBnaAl2N+7IN4vfQK8Ffg/w74arx++grwtTg+s7PDk6hXn0OSIC"
  1026. b"Gozx3hYzmf0OOkxu6F1/oKlx2PEqfuhRFckv1zB1ClHUasgepR5L+Qz7MWafgOE6jXyCP"
  1027. b"Hdpst1CpqC5qK/qUaKIQBFQK/sbGTXmeET8KaCgW7bZsbj3dsY2TSa/gBC0NmTtsOO0ga"
  1028. b"LBxF4OuMTNk1nmtjbI60HY90g8MZ8iabC5hlt+53z4bVxVGkCKKgYgmpgiaIXdv5FgS52"
  1029. b"5dUQY6P37kbWzcVNzd1cVnO4VoO+7bPcvhV4jj8y4LAC8YsL2iQCIeMNgM7avNxfxeeWp"
  1030. b"guHz4yOz2/UCm/cnhy35jcG99/YHZislpd2Fup7OMR5YOVHLZYizI/sj035BBG/BdhP/A"
  1031. b"iRiMvwGEUeC5fuxYw6gUmrlGKw5N2ROuMh4c+o+FYvhkGeX7wPD9/PmBmnURgcJ0EJnOZ"
  1032. b"iSmV/kM4cV3PsN04uqGp/BM1XTZW4zkCm/L9kbDt0jrfk9cMcdM9absmjojhsI3NU4eE9"
  1033. b"d4R+LG4g1qbGFHf9lBrEclwnTCs3r1iuOY2u/+jGVm4iCwiyXpJE61SkUq6RhVW0FVFpo"
  1034. b"ZZ0oiu6ppuFSxSFBXTUOQCFRmhhElFQ9XNgiyJhbv/dnf8hnaeETR4R1+sHuX37+c/H/o"
  1035. b"kjZ5Nbe88bMvv7voJvYWeOYaGBn7IGkr6xb3X5vqiExNL585/+NyPX3/5jbBzfaibcHhl"
  1036. b"4vny9ZHfT6wG0Y6Lfrv/pZXKmS+WyPD4O/2nLy0KKHXo1OjVs1eGPn75o+5DvW3+6D9jd"
  1037. b"bFaTBcAAA=="
  1038. )]
  1039. """
  1040. Minimal package generated by running dh_make on empty directory
  1041. Then cat test-pkg_1.0-1_amd64.deb | gzip | base64
  1042. Then cat test-pkg_1.1-1_amd64.deb | gzip | base64
  1043. """
  1044. DEB_PACKAGE_GZIP_BASE64 = [(
  1045. b"H4sIACTXLlYAA1O0SSxKzrDjSklNykzM003KzEssqlRQUDA0MTG1NDQwNDVTUDBQAAEIa"
  1046. b"WhgYGZioqBgogADCVxGegZcyfl5JUX5OXoliUV66VVE6DcwheuX7+ZgAAEW5rdXHb0PG4"
  1047. b"iwf5j3WfMT6zWzzMuZgoE3jjYraNzbbFKWGms0SaRw/r2SV23WZ4IdP8preM4yqf0jt95"
  1048. b"3c8qnacfNxJUkf9/w+/3X9ph2GEdgQdixrz/niHKKTnYXizf4oSC7tHOz2Zzq+/6vn8/7"
  1049. b"ezQ7c1tmi7xZ3SGJ4yzhT2dcr7V+W3zM5ZPu/56PSv4Zdok+7Yv/V/6buWaKVlFkkV58S"
  1050. b"N3GmLgnqzRmeZ3V3ymmurS5fGa85/LNx1bpZMin3S6dvXKqydp3ubP1vmyarJZb/qSh62"
  1051. b"C8oIdxqm/BtvkGDza+On/Vfv2py7/0LV7VH+qR6a+bkKUbHXt5/SG187d+nps1a5PJfMO"
  1052. b"i11dWcUe1HjwaW3Q5RHXn9LmcHy+tW9YcKf0768XVB1t3R0bKrzs5t9P+6r7rZ99svH10"
  1053. b"+Q6F/o8tf1fO/32y+fWa14eifd+WxUy0jcxYH7N9/tUvmnUZL74pW32qLeuRU+ZwYGASa"
  1054. b"GBgUWBgxM90ayy3VdmykkGDgYErJbEkERydFVWQmCMQo8aWZvAY/WteFRHFwMCYqXTPjI"
  1055. b"lBkVEMGLsl+k8XP1D/z+gXyyDOvUemlnHqAVkvu0rRQ2fUFodkN3mtU9uwhqk8V+TqPEE"
  1056. b"Nc7fzoQ4n71lqRs/7kbbT0+qOZuKH4r8mjzsc1k/YkCHN8Pjg48fbpE+teHa96LNcfu0V"
  1057. b"5n2/Z2xa2KDvaCOx8cqBFxc514uZ3TmadXS+6cpzU7wSzq5SWfapJOD9n6wLXSwtlgxZh"
  1058. b"xITzWW7buhx/bb291RcVlEfeC9K5hlrqunSzIMSZT7/Nqgc/qMvMNW227WI8ezB8mVuZh"
  1059. b"0hERJSvysfburr4Dx0I9BW57UwR4+e1gxu49PcEt8sbK18Xpvt//Hj5UYm+Zc25q+T4xl"
  1060. b"rJvxfVnh80oadq57OZxPaU1bbztv1yF365W4t45Yr+XrFzov237GVY1Zgf7NvE4+W2SuR"
  1061. b"lQtLauR1TQ/mbOiIONYya6tU1jPGpWfk/i1+ttiXe3ZO14n0YOWggndznjGlGLyfVbBC6"
  1062. b"MRP5aMM7aCco/s7sZqB8RlTQwADw8rnuT/sDHi7mUASjJFRAAbWwNLiAwAA"
  1063. ), (
  1064. b"H4sIAL05B1wCA1O0SSxKzrDjSklNykzM003KzEssqlRQUDA0NTG2NDc3NjdTUDBQAAEIa"
  1065. b"WhgYGZioqBgogADCVxGegZcyfl5JUX5OXoliUV66VVE6De3gOuX7+ZgAAEW5rdXzmbdMR"
  1066. b"BgSJj/VeQzQ+ztT/W+EVEnFraKOTlXh6+JXB8RbTRpzgWb2qdLX0+RmTRZcYlyxJutJsk"
  1067. b"/pfsfq9yqWZJ4JVVS97jBPPnz1yviluw51b0q4tnrWemCU2a/17mTUBYX0XBC6nH8rvvZ"
  1068. b"n/WP7nu40+Jlz7drPNLvCjULQkXOv677OV9s4bPsv5+tvCzPG8s57no479qV/5V/813Kh"
  1069. b"Wy3Pbj4827Jq5v6W/wk7zL1/+zbfH6btVb/3Pm5EapukaJvdgfcape/JZZWe+mZ4+Grby"
  1070. b"7UTaroPzyv9urC1W2MT9+F2bZtWJOyXfGo5dv7DGXJUzee+p930Od0j8QNceNHJffOTr2"
  1071. b"kOJe93mWG+nPdLsG6fz++MV5h1OGr0N9yf3N2ydzQ5x/E9Aw/s9xzmOpULnKtsSZqc/rr"
  1072. b"RQdf/Lu/ckKE9xU5VRuNehbzTr6789a+P2lt2zk5cFqe3N2289+j/hfH2X39/+nvc5vTW"
  1073. b"a/+83pvWqY3e93JWYsmup693HzCOPBk0LI9O7PtiqawN9y8eaTV75DLLL2dNWqTLsTsOn"
  1074. b"7wy0fTe5oLH//7eNf89Co3dRUHJmLRh20s/xhYJkoeYdBgYEhJLEkEJ4uKKkgKIJQyjI3"
  1075. b"gKeOveVVEFAMDY6bSPTMmBkVGMWAqKdF/uviB+n/GwlgGce49MrWMUw/IetlVih46o7Y4"
  1076. b"0uZe/t9lt85aMUrdWhjueTHRd1nr1uK830feH74vcPKU2pkbP4SZnta5PhC9dfPTqvv7f"
  1077. b"n068XRDRDzLuv8Oa5p1L+02ZN127vp6mzSzzFqpLkmbwyl131J1xW58YlcxXSWs0PTbpT"
  1078. b"z28ZUnE/e+NN93weAd40a/zzJ7+Re/v+R7+f3VBVFJCyZsv523ySJ12t7Nt5b8uBu8zuJ"
  1079. b"2Laer//nZCkbXlxtYXvvA8+VSVsCRpo8BawtftKWyZBjkWa6/0X7qXfbF9reH/ro6S63Y"
  1080. b"rCj8t8cltPIOj9H/8LyIxj6bMsZVVtu+ngj6MCNV5JXhOs07RXWxrb3xsqJMDRksx/5bO"
  1081. b"bNtevXz2cdpzzI19Roede4NXxAyK9Dlrtp8JtELLNPWbBe9HfJlj1Hiv69erIFBnX/Pe1"
  1082. b"4QnzLD+p2AiTc383/P+7sW3WoxnXra49iJKJeZy7gc9Z02S57qrvWW3day501VhsbPtfK"
  1083. b"C5nyBG9qjr08E59KY1vUTGRg7mRsCGBimFa+3sTPg7WYCSTBGRgEAzEOeH04EAAA="
  1084. )]
  1085. def run_cmd(self, vm, cmd, user="root"):
  1086. '''Run a command *cmd* in a *vm* as *user*. Return its exit code.
  1087. :type self: qubes.tests.SystemTestCase | VmUpdatesMixin
  1088. :param qubes.vm.qubesvm.QubesVM vm: VM object to run command in
  1089. :param str cmd: command to execute
  1090. :param std user: user to execute command as
  1091. :return int: command exit code
  1092. '''
  1093. try:
  1094. self.loop.run_until_complete(vm.run_for_stdio(cmd))
  1095. except subprocess.CalledProcessError as e:
  1096. return e.returncode
  1097. return 0
  1098. def assertRunCommandReturnCode(self, vm, cmd, expected_returncode):
  1099. p = self.loop.run_until_complete(
  1100. vm.run(cmd, user='root',
  1101. stdout=subprocess.PIPE, stderr=subprocess.PIPE))
  1102. (stdout, stderr) = self.loop.run_until_complete(p.communicate())
  1103. self.assertIn(
  1104. self.loop.run_until_complete(p.wait()), expected_returncode,
  1105. '{}: {}\n{}'.format(cmd, stdout, stderr))
  1106. def setUp(self):
  1107. '''
  1108. :type self: qubes.tests.SystemTestCase | VmUpdatesMixin
  1109. '''
  1110. if not self.template.count('debian') and \
  1111. not self.template.count('fedora'):
  1112. self.skipTest("Template {} not supported by this test".format(
  1113. self.template))
  1114. super(VmUpdatesMixin, self).setUp()
  1115. self.update_cmd = None
  1116. if self.template.count("debian"):
  1117. self.update_cmd = "set -o pipefail; apt-get update 2>&1 | " \
  1118. "{ ! grep '^W:\|^E:'; }"
  1119. self.upgrade_cmd = "apt-get -V dist-upgrade -y"
  1120. self.install_cmd = "apt-get install -y {}"
  1121. self.install_test_cmd = "dpkg -l {}"
  1122. self.exit_code_ok = [0]
  1123. elif self.template.count("fedora"):
  1124. cmd = "yum"
  1125. try:
  1126. # assume template name in form "fedora-XX-suffix"
  1127. if int(self.template.split("-")[1]) > 21:
  1128. cmd = "dnf"
  1129. except ValueError:
  1130. pass
  1131. self.update_cmd = "{cmd} clean all; {cmd} check-update".format(
  1132. cmd=cmd)
  1133. self.upgrade_cmd = "{cmd} upgrade -y".format(cmd=cmd)
  1134. self.install_cmd = cmd + " install -y {}"
  1135. self.install_test_cmd = "rpm -q {}"
  1136. self.exit_code_ok = [0, 100]
  1137. self.init_default_template(self.template)
  1138. self.init_networking()
  1139. self.testvm1 = self.app.add_new_vm(
  1140. qubes.vm.appvm.AppVM,
  1141. name=self.make_vm_name('vm1'),
  1142. label='red')
  1143. self.loop.run_until_complete(self.testvm1.create_on_disk())
  1144. def test_000_simple_update(self):
  1145. '''
  1146. :type self: qubes.tests.SystemTestCase | VmUpdatesMixin
  1147. '''
  1148. self.app.save()
  1149. self.testvm1 = self.app.domains[self.testvm1.qid]
  1150. self.loop.run_until_complete(self.testvm1.start())
  1151. self.assertRunCommandReturnCode(self.testvm1,
  1152. self.update_cmd, self.exit_code_ok)
  1153. def create_repo_apt(self, version=0):
  1154. '''
  1155. :type self: qubes.tests.SystemTestCase | VmUpdatesMixin
  1156. '''
  1157. pkg_file_name = "test-pkg_1.{}-1_amd64.deb".format(version)
  1158. self.loop.run_until_complete(self.netvm_repo.run_for_stdio('''
  1159. mkdir -p /tmp/apt-repo \
  1160. && cd /tmp/apt-repo \
  1161. && base64 -d | zcat > {}
  1162. '''.format(pkg_file_name),
  1163. input=self.DEB_PACKAGE_GZIP_BASE64[version]))
  1164. # do not assume dpkg-scanpackage installed
  1165. packages_path = "dists/test/main/binary-amd64/Packages"
  1166. self.loop.run_until_complete(self.netvm_repo.run_for_stdio('''
  1167. mkdir -p /tmp/apt-repo/dists/test/main/binary-amd64 \
  1168. && cd /tmp/apt-repo \
  1169. && cat > {packages} \
  1170. && echo MD5sum: $(openssl md5 -r {pkg} | cut -f 1 -d ' ') \
  1171. >> {packages} \
  1172. && echo SHA1: $(openssl sha1 -r {pkg} | cut -f 1 -d ' ') \
  1173. >> {packages} \
  1174. && echo SHA256: $(openssl sha256 -r {pkg} | cut -f 1 -d ' ') \
  1175. >> {packages} \
  1176. && sed -i -e "s,@SIZE@,$(stat -c %s {pkg})," {packages} \
  1177. && gzip < {packages} > {packages}.gz
  1178. '''.format(pkg=pkg_file_name, packages=packages_path),
  1179. input='''\
  1180. Package: test-pkg
  1181. Version: 1.{version}-1
  1182. Architecture: amd64
  1183. Maintainer: unknown <user@host>
  1184. Installed-Size: 25
  1185. Filename: {pkg}
  1186. Size: @SIZE@
  1187. Section: unknown
  1188. Priority: optional
  1189. Description: Test package'''.format(pkg=pkg_file_name, version=version).encode(
  1190. 'utf-8')))
  1191. self.loop.run_until_complete(self.netvm_repo.run_for_stdio('''
  1192. mkdir -p /tmp/apt-repo/dists/test \
  1193. && cd /tmp/apt-repo/dists/test \
  1194. && cat > Release \
  1195. && echo '' $(sha256sum {p} | cut -f 1 -d ' ') $(stat -c %s {p}) {p}\
  1196. >> Release \
  1197. && echo '' $(sha256sum {z} | cut -f 1 -d ' ') $(stat -c %s {z}) {z}\
  1198. >> Release
  1199. '''.format(p='main/binary-amd64/Packages',
  1200. z='main/binary-amd64/Packages.gz'),
  1201. input=b'''\
  1202. Label: Test repo
  1203. Suite: test
  1204. Codename: test
  1205. Date: Tue, 27 Oct 2015 03:22:09 UTC
  1206. Architectures: amd64
  1207. Components: main
  1208. SHA256:
  1209. '''))
  1210. def create_repo_yum(self, version=0):
  1211. '''
  1212. :type self: qubes.tests.SystemTestCase | VmUpdatesMixin
  1213. '''
  1214. pkg_file_name = "test-pkg-1.{}-1.fc21.x86_64.rpm".format(version)
  1215. self.loop.run_until_complete(self.netvm_repo.run_for_stdio('''
  1216. mkdir -p /tmp/yum-repo \
  1217. && cd /tmp/yum-repo \
  1218. && base64 -d | zcat > {}
  1219. '''.format(pkg_file_name), input=self.RPM_PACKAGE_GZIP_BASE64[
  1220. version]))
  1221. # createrepo is installed by default in Fedora template
  1222. self.loop.run_until_complete(self.netvm_repo.run_for_stdio(
  1223. 'createrepo /tmp/yum-repo'))
  1224. def create_repo_and_serve(self):
  1225. '''
  1226. :type self: qubes.tests.SystemTestCase | VmUpdatesMixin
  1227. '''
  1228. if self.template.count("debian") or self.template.count("whonix"):
  1229. self.create_repo_apt()
  1230. self.loop.run_until_complete(self.netvm_repo.run(
  1231. 'cd /tmp/apt-repo && python -m SimpleHTTPServer 8080',
  1232. stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL))
  1233. elif self.template.count("fedora"):
  1234. self.create_repo_yum()
  1235. self.loop.run_until_complete(self.netvm_repo.run(
  1236. 'cd /tmp/yum-repo && python -m SimpleHTTPServer 8080',
  1237. stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL))
  1238. else:
  1239. # not reachable...
  1240. self.skipTest("Template {} not supported by this test".format(
  1241. self.template))
  1242. def add_update_to_repo(self):
  1243. if self.template.count("debian") or self.template.count("whonix"):
  1244. self.create_repo_apt(1)
  1245. elif self.template.count("fedora"):
  1246. self.create_repo_yum(1)
  1247. def configure_test_repo(self):
  1248. """
  1249. Configure test repository in test-vm and disable rest of them.
  1250. The critical part is to use "localhost" - this will work only when
  1251. accessed through update proxy and this is exactly what we want to
  1252. test here.
  1253. :type self: qubes.tests.SystemTestCase | VmUpdatesMixin
  1254. """
  1255. if self.template.count("debian") or self.template.count("whonix"):
  1256. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  1257. "rm -f /etc/apt/sources.list.d/* &&"
  1258. "echo 'deb [trusted=yes] http://localhost:8080 test main' "
  1259. "> /etc/apt/sources.list",
  1260. user="root"))
  1261. elif self.template.count("fedora"):
  1262. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  1263. "rm -f /etc/yum.repos.d/*.repo &&"
  1264. "echo '[test]' > /etc/yum.repos.d/test.repo &&"
  1265. "echo 'name=Test repo' >> /etc/yum.repos.d/test.repo &&"
  1266. "echo 'gpgcheck=0' >> /etc/yum.repos.d/test.repo &&"
  1267. "echo 'baseurl=http://localhost:8080/'"
  1268. " >> /etc/yum.repos.d/test.repo",
  1269. user="root"
  1270. ))
  1271. else:
  1272. # not reachable...
  1273. self.skipTest("Template {} not supported by this test".format(
  1274. self.template))
  1275. def test_010_update_via_proxy(self):
  1276. '''
  1277. Test both whether updates proxy works and whether is actually used
  1278. by the VM
  1279. :type self: qubes.tests.SystemTestCase | VmUpdatesMixin
  1280. '''
  1281. if self.template.count("minimal"):
  1282. self.skipTest("Template {} not supported by this test".format(
  1283. self.template))
  1284. self.netvm_repo = self.app.add_new_vm(
  1285. qubes.vm.appvm.AppVM,
  1286. name=self.make_vm_name('net'),
  1287. label='red')
  1288. self.netvm_repo.provides_network = True
  1289. self.loop.run_until_complete(self.netvm_repo.create_on_disk())
  1290. self.testvm1.netvm = self.netvm_repo
  1291. self.netvm_repo.features['service.qubes-updates-proxy'] = True
  1292. # TODO: consider also adding a test for the template itself
  1293. self.testvm1.features['service.updates-proxy-setup'] = True
  1294. self.app.save()
  1295. # Setup test repo
  1296. self.loop.run_until_complete(self.netvm_repo.start())
  1297. self.create_repo_and_serve()
  1298. # Configure local repo
  1299. self.loop.run_until_complete(self.testvm1.start())
  1300. self.configure_test_repo()
  1301. with self.qrexec_policy('qubes.UpdatesProxy', self.testvm1,
  1302. '$default', action='allow,target=' + self.netvm_repo.name):
  1303. # update repository metadata
  1304. self.assertRunCommandReturnCode(self.testvm1,
  1305. self.update_cmd, self.exit_code_ok)
  1306. # install test package
  1307. self.assertRunCommandReturnCode(self.testvm1,
  1308. self.install_cmd.format('test-pkg'), self.exit_code_ok)
  1309. # verify if it was really installed
  1310. self.assertRunCommandReturnCode(self.testvm1,
  1311. self.install_test_cmd.format('test-pkg'), self.exit_code_ok)
  1312. def test_020_updates_available_notification(self):
  1313. # override with StandaloneVM
  1314. self.testvm1 = self.app.add_new_vm(
  1315. qubes.vm.standalonevm.StandaloneVM,
  1316. name=self.make_vm_name('vm2'),
  1317. label='red')
  1318. tpl = self.app.domains[self.template]
  1319. self.testvm1.clone_properties(tpl)
  1320. self.testvm1.features.update(tpl.features)
  1321. self.loop.run_until_complete(
  1322. self.testvm1.clone_disk_files(tpl))
  1323. self.loop.run_until_complete(self.testvm1.start())
  1324. self.netvm_repo = self.testvm1
  1325. self.create_repo_and_serve()
  1326. self.configure_test_repo()
  1327. self.loop.run_until_complete(
  1328. self.testvm1.run_for_stdio(
  1329. '/usr/lib/qubes/upgrades-status-notify',
  1330. user='root',
  1331. ))
  1332. self.assertFalse(self.testvm1.features.get('updates-available', False))
  1333. # update repository metadata
  1334. self.assertRunCommandReturnCode(
  1335. self.testvm1, self.update_cmd, self.exit_code_ok)
  1336. # install test package
  1337. self.assertRunCommandReturnCode(
  1338. self.testvm1, self.install_cmd.format('test-pkg'), self.exit_code_ok)
  1339. self.assertFalse(self.testvm1.features.get('updates-available', False))
  1340. self.add_update_to_repo()
  1341. # update repository metadata
  1342. self.assertRunCommandReturnCode(
  1343. self.testvm1, self.update_cmd, self.exit_code_ok)
  1344. self.loop.run_until_complete(
  1345. self.testvm1.run_for_stdio(
  1346. '/usr/lib/qubes/upgrades-status-notify',
  1347. user='root',
  1348. ))
  1349. self.assertTrue(self.testvm1.features.get('updates-available', False))
  1350. # install updates
  1351. self.assertRunCommandReturnCode(
  1352. self.testvm1, self.upgrade_cmd, self.exit_code_ok)
  1353. self.assertFalse(self.testvm1.features.get('updates-available', False))
  1354. def create_testcases_for_templates():
  1355. yield from qubes.tests.create_testcases_for_templates('VmNetworking',
  1356. VmNetworkingMixin, qubes.tests.SystemTestCase,
  1357. module=sys.modules[__name__])
  1358. yield from qubes.tests.create_testcases_for_templates('VmIPv6Networking',
  1359. VmIPv6NetworkingMixin, qubes.tests.SystemTestCase,
  1360. module=sys.modules[__name__])
  1361. yield from qubes.tests.create_testcases_for_templates('VmUpdates',
  1362. VmUpdatesMixin, qubes.tests.SystemTestCase,
  1363. module=sys.modules[__name__])
  1364. def load_tests(loader, tests, pattern):
  1365. tests.addTests(loader.loadTestsFromNames(
  1366. create_testcases_for_templates()))
  1367. return tests
  1368. qubes.tests.maybe_create_testcases_on_import(create_testcases_for_templates)