network.py 63 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516
  1. #
  2. # The Qubes OS Project, https://www.qubes-os.org/
  3. #
  4. # Copyright (C) 2015
  5. # Marek Marczykowski-Górecki <marmarek@invisiblethingslab.com>
  6. # Copyright (C) 2015 Wojtek Porczyk <woju@invisiblethingslab.com>
  7. #
  8. # This library is free software; you can redistribute it and/or
  9. # modify it under the terms of the GNU Lesser General Public
  10. # License as published by the Free Software Foundation; either
  11. # version 2.1 of the License, or (at your option) any later version.
  12. #
  13. # This library is distributed in the hope that it will be useful,
  14. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  15. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  16. # Lesser General Public License for more details.
  17. #
  18. # You should have received a copy of the GNU Lesser General Public
  19. # License along with this library; if not, see <https://www.gnu.org/licenses/>.
  20. #
  21. from distutils import spawn
  22. import asyncio
  23. import subprocess
  24. import sys
  25. import time
  26. import unittest
  27. import qubes.tests
  28. import qubes.firewall
  29. import qubes.vm.qubesvm
  30. import qubes.vm.appvm
  31. # noinspection PyAttributeOutsideInit,PyPep8Naming
  32. class VmNetworkingMixin(object):
  33. test_ip = '192.168.123.45'
  34. test_name = 'test.example.com'
  35. ping_cmd = 'ping -W 1 -n -c 1 {target}'
  36. ping_ip = ping_cmd.format(target=test_ip)
  37. ping_name = ping_cmd.format(target=test_name)
  38. # filled by load_tests
  39. template = None
  40. def run_cmd(self, vm, cmd, user="root"):
  41. '''Run a command *cmd* in a *vm* as *user*. Return its exit code.
  42. :type self: qubes.tests.SystemTestCase | VmNetworkingMixin
  43. :param qubes.vm.qubesvm.QubesVM vm: VM object to run command in
  44. :param str cmd: command to execute
  45. :param std user: user to execute command as
  46. :return int: command exit code
  47. '''
  48. try:
  49. self.loop.run_until_complete(vm.run_for_stdio(cmd, user=user))
  50. except subprocess.CalledProcessError as e:
  51. return e.returncode
  52. return 0
  53. def setUp(self):
  54. '''
  55. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  56. '''
  57. super(VmNetworkingMixin, self).setUp()
  58. if self.template.startswith('whonix-'):
  59. self.skipTest("Test not supported here - Whonix uses its own "
  60. "firewall settings")
  61. self.init_default_template(self.template)
  62. self.testnetvm = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  63. name=self.make_vm_name('netvm1'),
  64. label='red')
  65. self.loop.run_until_complete(self.testnetvm.create_on_disk())
  66. self.testnetvm.provides_network = True
  67. self.testnetvm.netvm = None
  68. self.testvm1 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  69. name=self.make_vm_name('vm1'),
  70. label='red')
  71. self.loop.run_until_complete(self.testvm1.create_on_disk())
  72. self.testvm1.netvm = self.testnetvm
  73. self.app.save()
  74. self.configure_netvm()
  75. def configure_netvm(self):
  76. '''
  77. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  78. '''
  79. def run_netvm_cmd(cmd):
  80. if self.run_cmd(self.testnetvm, cmd) != 0:
  81. self.fail("Command '%s' failed" % cmd)
  82. if not self.testnetvm.is_running():
  83. self.loop.run_until_complete(self.testnetvm.start())
  84. # Ensure that dnsmasq is installed:
  85. try:
  86. self.loop.run_until_complete(self.testnetvm.run_for_stdio(
  87. 'dnsmasq --version', user='root'))
  88. except subprocess.CalledProcessError:
  89. self.skipTest("dnsmasq not installed")
  90. run_netvm_cmd("ip link add test0 type dummy")
  91. run_netvm_cmd("ip link set test0 up")
  92. run_netvm_cmd("ip addr add {}/24 dev test0".format(self.test_ip))
  93. run_netvm_cmd("iptables -I INPUT -d {} -j ACCEPT --wait".format(
  94. self.test_ip))
  95. # ignore failure
  96. self.run_cmd(self.testnetvm, "killall --wait dnsmasq")
  97. run_netvm_cmd("dnsmasq -a {ip} -A /{name}/{ip} -i test0 -z".format(
  98. ip=self.test_ip, name=self.test_name))
  99. run_netvm_cmd("echo nameserver {} > /etc/resolv.conf".format(
  100. self.test_ip))
  101. run_netvm_cmd("/usr/lib/qubes/qubes-setup-dnat-to-ns")
  102. def test_000_simple_networking(self):
  103. '''
  104. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  105. '''
  106. self.loop.run_until_complete(self.testvm1.start())
  107. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  108. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  109. def test_010_simple_proxyvm(self):
  110. '''
  111. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  112. '''
  113. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  114. name=self.make_vm_name('proxy'),
  115. label='red')
  116. self.proxy.provides_network = True
  117. self.proxy.netvm = self.testnetvm
  118. self.loop.run_until_complete(self.proxy.create_on_disk())
  119. self.testvm1.netvm = self.proxy
  120. self.app.save()
  121. self.loop.run_until_complete(self.testvm1.start())
  122. self.assertTrue(self.proxy.is_running())
  123. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
  124. "Ping by IP from ProxyVM failed")
  125. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
  126. "Ping by name from ProxyVM failed")
  127. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  128. "Ping by IP from AppVM failed")
  129. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  130. "Ping by IP from AppVM failed")
  131. @qubes.tests.expectedFailureIfTemplate('debian-7')
  132. @unittest.skipUnless(spawn.find_executable('xdotool'),
  133. "xdotool not installed")
  134. def test_020_simple_proxyvm_nm(self):
  135. '''
  136. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  137. '''
  138. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  139. name=self.make_vm_name('proxy'),
  140. label='red')
  141. self.proxy.provides_network = True
  142. self.loop.run_until_complete(self.proxy.create_on_disk())
  143. self.proxy.netvm = self.testnetvm
  144. self.proxy.features['service.network-manager'] = True
  145. self.testvm1.netvm = self.proxy
  146. self.app.save()
  147. self.loop.run_until_complete(self.testvm1.start())
  148. self.assertTrue(self.proxy.is_running())
  149. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  150. "Ping by IP failed")
  151. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  152. "Ping by name failed")
  153. # reconnect to make sure that device was configured by NM
  154. self.assertEqual(
  155. self.run_cmd(self.proxy, "nmcli device disconnect eth0",
  156. user="user"),
  157. 0, "Failed to disconnect eth0 using nmcli")
  158. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  159. "Network should be disabled, but apparently it isn't")
  160. self.assertEqual(
  161. self.run_cmd(self.proxy,
  162. 'nmcli connection up "VM uplink eth0" ifname eth0',
  163. user="user"),
  164. 0, "Failed to connect eth0 using nmcli")
  165. self.assertEqual(self.run_cmd(self.proxy, "nm-online", user="user"), 0,
  166. "Failed to wait for NM connection")
  167. # check for nm-applet presence
  168. self.assertEqual(subprocess.call([
  169. 'xdotool', 'search', '--class', '{}:nm-applet'.format(
  170. self.proxy.name)],
  171. stdout=subprocess.DEVNULL), 0, "nm-applet window not found")
  172. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  173. "Ping by IP failed (after NM reconnection")
  174. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  175. "Ping by name failed (after NM reconnection)")
  176. def test_030_firewallvm_firewall(self):
  177. '''
  178. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  179. '''
  180. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  181. name=self.make_vm_name('proxy'),
  182. label='red')
  183. self.proxy.provides_network = True
  184. self.loop.run_until_complete(self.proxy.create_on_disk())
  185. self.proxy.netvm = self.testnetvm
  186. self.testvm1.netvm = self.proxy
  187. self.app.save()
  188. # block all for first
  189. self.testvm1.firewall.rules = [qubes.firewall.Rule(action='drop')]
  190. self.testvm1.firewall.save()
  191. self.loop.run_until_complete(self.testvm1.start())
  192. self.assertTrue(self.proxy.is_running())
  193. server = self.loop.run_until_complete(self.testnetvm.run(
  194. 'socat TCP-LISTEN:1234,fork EXEC:/bin/uname'))
  195. try:
  196. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
  197. "Ping by IP from ProxyVM failed")
  198. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
  199. "Ping by name from ProxyVM failed")
  200. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  201. "Ping by IP should be blocked")
  202. client_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
  203. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  204. "TCP connection should be blocked")
  205. # block all except ICMP
  206. self.testvm1.firewall.rules = [(
  207. qubes.firewall.Rule(None, action='accept', proto='icmp')
  208. )]
  209. self.testvm1.firewall.save()
  210. # Ugly hack b/c there is no feedback when the rules are actually
  211. # applied
  212. time.sleep(3)
  213. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  214. "Ping by IP failed (should be allowed now)")
  215. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  216. "Ping by name should be blocked")
  217. # all TCP still blocked
  218. self.testvm1.firewall.rules = [
  219. qubes.firewall.Rule(None, action='accept', proto='icmp'),
  220. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  221. ]
  222. self.testvm1.firewall.save()
  223. # Ugly hack b/c there is no feedback when the rules are actually
  224. # applied
  225. time.sleep(3)
  226. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  227. "Ping by name failed (should be allowed now)")
  228. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  229. "TCP connection should be blocked")
  230. # block all except target
  231. self.testvm1.firewall.rules = [
  232. qubes.firewall.Rule(None, action='accept', dsthost=self.test_ip,
  233. proto='tcp', dstports=1234),
  234. ]
  235. self.testvm1.firewall.save()
  236. # Ugly hack b/c there is no feedback when the rules are actually
  237. # applied
  238. time.sleep(3)
  239. self.assertEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  240. "TCP connection failed (should be allowed now)")
  241. # allow all except target
  242. self.testvm1.firewall.rules = [
  243. qubes.firewall.Rule(None, action='drop', dsthost=self.test_ip,
  244. proto='tcp', dstports=1234),
  245. qubes.firewall.Rule(action='accept'),
  246. ]
  247. self.testvm1.firewall.save()
  248. # Ugly hack b/c there is no feedback when the rules are actually
  249. # applied
  250. time.sleep(3)
  251. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  252. "TCP connection should be blocked")
  253. finally:
  254. server.terminate()
  255. self.loop.run_until_complete(server.wait())
  256. def test_040_inter_vm(self):
  257. '''
  258. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  259. '''
  260. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  261. name=self.make_vm_name('proxy'),
  262. label='red')
  263. self.loop.run_until_complete(self.proxy.create_on_disk())
  264. self.proxy.provides_network = True
  265. self.proxy.netvm = self.testnetvm
  266. self.testvm1.netvm = self.proxy
  267. self.testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  268. name=self.make_vm_name('vm2'),
  269. label='red')
  270. self.loop.run_until_complete(self.testvm2.create_on_disk())
  271. self.testvm2.netvm = self.proxy
  272. self.app.save()
  273. self.loop.run_until_complete(asyncio.wait([
  274. self.testvm1.start(),
  275. self.testvm2.start()]))
  276. self.assertNotEqual(self.run_cmd(self.testvm1,
  277. self.ping_cmd.format(target=self.testvm2.ip)), 0)
  278. self.testvm2.netvm = self.testnetvm
  279. self.assertNotEqual(self.run_cmd(self.testvm1,
  280. self.ping_cmd.format(target=self.testvm2.ip)), 0)
  281. self.assertNotEqual(self.run_cmd(self.testvm2,
  282. self.ping_cmd.format(target=self.testvm1.ip)), 0)
  283. self.testvm1.netvm = self.testnetvm
  284. self.assertNotEqual(self.run_cmd(self.testvm1,
  285. self.ping_cmd.format(target=self.testvm2.ip)), 0)
  286. self.assertNotEqual(self.run_cmd(self.testvm2,
  287. self.ping_cmd.format(target=self.testvm1.ip)), 0)
  288. def test_050_spoof_ip(self):
  289. '''Test if VM IP spoofing is blocked
  290. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  291. '''
  292. self.loop.run_until_complete(self.testvm1.start())
  293. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  294. self.assertEqual(self.run_cmd(self.testnetvm,
  295. 'iptables -I INPUT -i vif+ ! -s {} -p icmp -j LOG'.format(
  296. self.testvm1.ip)), 0)
  297. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  298. 'ip addr flush dev eth0 && '
  299. 'ip addr add 10.137.1.128/24 dev eth0 && '
  300. 'ip route add default dev eth0',
  301. user='root'))
  302. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  303. "Spoofed ping should be blocked")
  304. try:
  305. (output, _) = self.loop.run_until_complete(
  306. self.testnetvm.run_for_stdio('iptables -nxvL INPUT',
  307. user='root'))
  308. except subprocess.CalledProcessError:
  309. self.fail('iptables -nxvL INPUT failed')
  310. output = output.decode().splitlines()
  311. packets = output[2].lstrip().split()[0]
  312. self.assertEquals(packets, '0', 'Some packet hit the INPUT rule')
  313. def test_100_late_xldevd_startup(self):
  314. '''Regression test for #1990
  315. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  316. '''
  317. # Simulater late xl devd startup
  318. cmd = "systemctl stop xendriverdomain"
  319. if self.run_cmd(self.testnetvm, cmd) != 0:
  320. self.fail("Command '%s' failed" % cmd)
  321. self.loop.run_until_complete(self.testvm1.start())
  322. cmd = "systemctl start xendriverdomain"
  323. if self.run_cmd(self.testnetvm, cmd) != 0:
  324. self.fail("Command '%s' failed" % cmd)
  325. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  326. def test_200_fake_ip_simple(self):
  327. '''Test hiding VM real IP
  328. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  329. '''
  330. self.testvm1.features['net.fake-ip'] = '192.168.1.128'
  331. self.testvm1.features['net.fake-gateway'] = '192.168.1.1'
  332. self.testvm1.features['net.fake-netmask'] = '255.255.255.0'
  333. self.app.save()
  334. self.loop.run_until_complete(self.testvm1.start())
  335. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  336. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  337. try:
  338. (output, _) = self.loop.run_until_complete(
  339. self.testvm1.run_for_stdio(
  340. 'ip addr show dev eth0', user='root'))
  341. except subprocess.CalledProcessError:
  342. self.fail('ip addr show dev eth0 failed')
  343. output = output.decode()
  344. self.assertIn('192.168.1.128', output)
  345. self.assertNotIn(str(self.testvm1.ip), output)
  346. try:
  347. (output, _) = self.loop.run_until_complete(
  348. self.testvm1.run_for_stdio('ip route show', user='root'))
  349. except subprocess.CalledProcessError:
  350. self.fail('ip route show failed')
  351. output = output.decode()
  352. self.assertIn('192.168.1.1', output)
  353. self.assertNotIn(str(self.testvm1.netvm.ip), output)
  354. def test_201_fake_ip_without_gw(self):
  355. '''Test hiding VM real IP
  356. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  357. '''
  358. self.testvm1.features['net.fake-ip'] = '192.168.1.128'
  359. self.app.save()
  360. self.loop.run_until_complete(self.testvm1.start())
  361. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  362. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  363. try:
  364. (output, _) = self.loop.run_until_complete(
  365. self.testvm1.run_for_stdio('ip addr show dev eth0',
  366. user='root'))
  367. except subprocess.CalledProcessError:
  368. self.fail('ip addr show dev eth0 failed')
  369. output = output.decode()
  370. self.assertIn('192.168.1.128', output)
  371. self.assertNotIn(str(self.testvm1.ip), output)
  372. def test_202_fake_ip_firewall(self):
  373. '''Test hiding VM real IP, firewall
  374. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  375. '''
  376. self.testvm1.features['net.fake-ip'] = '192.168.1.128'
  377. self.testvm1.features['net.fake-gateway'] = '192.168.1.1'
  378. self.testvm1.features['net.fake-netmask'] = '255.255.255.0'
  379. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  380. name=self.make_vm_name('proxy'),
  381. label='red')
  382. self.proxy.provides_network = True
  383. self.loop.run_until_complete(self.proxy.create_on_disk())
  384. self.proxy.netvm = self.testnetvm
  385. self.testvm1.netvm = self.proxy
  386. self.app.save()
  387. # block all but ICMP and DNS
  388. self.testvm1.firewall.rules = [
  389. qubes.firewall.Rule(None, action='accept', proto='icmp'),
  390. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  391. ]
  392. self.testvm1.firewall.save()
  393. self.loop.run_until_complete(self.testvm1.start())
  394. self.assertTrue(self.proxy.is_running())
  395. server = self.loop.run_until_complete(self.testnetvm.run(
  396. 'socat TCP-LISTEN:1234,fork EXEC:/bin/uname'))
  397. try:
  398. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
  399. "Ping by IP from ProxyVM failed")
  400. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
  401. "Ping by name from ProxyVM failed")
  402. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  403. "Ping by IP should be allowed")
  404. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  405. "Ping by name should be allowed")
  406. client_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
  407. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  408. "TCP connection should be blocked")
  409. finally:
  410. server.terminate()
  411. self.loop.run_until_complete(server.wait())
  412. def test_203_fake_ip_inter_vm_allow(self):
  413. '''Access VM with "fake IP" from other VM (when firewall allows)
  414. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  415. '''
  416. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  417. name=self.make_vm_name('proxy'),
  418. label='red')
  419. self.loop.run_until_complete(self.proxy.create_on_disk())
  420. self.proxy.provides_network = True
  421. self.proxy.netvm = self.testnetvm
  422. self.testvm1.netvm = self.proxy
  423. self.testvm1.features['net.fake-ip'] = '192.168.1.128'
  424. self.testvm1.features['net.fake-gateway'] = '192.168.1.1'
  425. self.testvm1.features['net.fake-netmask'] = '255.255.255.0'
  426. self.testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  427. name=self.make_vm_name('vm2'),
  428. label='red')
  429. self.loop.run_until_complete(self.testvm2.create_on_disk())
  430. self.testvm2.netvm = self.proxy
  431. self.app.save()
  432. self.loop.run_until_complete(self.testvm1.start())
  433. self.loop.run_until_complete(self.testvm2.start())
  434. cmd = 'iptables -I FORWARD -s {} -d {} -j ACCEPT'.format(
  435. self.testvm2.ip, self.testvm1.ip)
  436. try:
  437. self.loop.run_until_complete(self.proxy.run_for_stdio(
  438. cmd, user='root'))
  439. except subprocess.CalledProcessError as e:
  440. raise AssertionError(
  441. '{} failed with: {}'.format(cmd, e.returncode)) from None
  442. try:
  443. cmd = 'iptables -I INPUT -s {} -j ACCEPT'.format(self.testvm2.ip)
  444. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  445. cmd, user='root'))
  446. except subprocess.CalledProcessError as e:
  447. raise AssertionError(
  448. '{} failed with: {}'.format(cmd, e.returncode)) from None
  449. self.assertEqual(self.run_cmd(self.testvm2,
  450. self.ping_cmd.format(target=self.testvm1.ip)), 0)
  451. try:
  452. cmd = 'iptables -nvxL INPUT | grep {}'.format(self.testvm2.ip)
  453. (stdout, _) = self.loop.run_until_complete(
  454. self.testvm1.run_for_stdio(cmd, user='root'))
  455. except subprocess.CalledProcessError as e:
  456. raise AssertionError(
  457. '{} failed with {}'.format(cmd, e.returncode)) from None
  458. self.assertNotEqual(stdout.decode().split()[0], '0',
  459. 'Packets didn\'t managed to the VM')
  460. def test_204_fake_ip_proxy(self):
  461. '''Test hiding VM real IP
  462. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  463. '''
  464. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  465. name=self.make_vm_name('proxy'),
  466. label='red')
  467. self.loop.run_until_complete(self.proxy.create_on_disk())
  468. self.proxy.provides_network = True
  469. self.proxy.netvm = self.testnetvm
  470. self.proxy.features['net.fake-ip'] = '192.168.1.128'
  471. self.proxy.features['net.fake-gateway'] = '192.168.1.1'
  472. self.proxy.features['net.fake-netmask'] = '255.255.255.0'
  473. self.testvm1.netvm = self.proxy
  474. self.app.save()
  475. self.loop.run_until_complete(self.testvm1.start())
  476. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0)
  477. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0)
  478. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  479. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  480. try:
  481. (output, _) = self.loop.run_until_complete(
  482. self.proxy.run_for_stdio(
  483. 'ip addr show dev eth0', user='root'))
  484. except subprocess.CalledProcessError:
  485. self.fail('ip addr show dev eth0 failed')
  486. output = output.decode()
  487. self.assertIn('192.168.1.128', output)
  488. self.assertNotIn(str(self.testvm1.ip), output)
  489. try:
  490. (output, _) = self.loop.run_until_complete(
  491. self.proxy.run_for_stdio(
  492. 'ip route show', user='root'))
  493. except subprocess.CalledProcessError:
  494. self.fail('ip route show failed')
  495. output = output.decode()
  496. self.assertIn('192.168.1.1', output)
  497. self.assertNotIn(str(self.testvm1.netvm.ip), output)
  498. try:
  499. (output, _) = self.loop.run_until_complete(
  500. self.testvm1.run_for_stdio(
  501. 'ip addr show dev eth0', user='root'))
  502. except subprocess.CalledProcessError:
  503. self.fail('ip addr show dev eth0 failed')
  504. output = output.decode()
  505. self.assertNotIn('192.168.1.128', output)
  506. self.assertIn(str(self.testvm1.ip), output)
  507. try:
  508. (output, _) = self.loop.run_until_complete(
  509. self.testvm1.run_for_stdio(
  510. 'ip route show', user='root'))
  511. except subprocess.CalledProcessError:
  512. self.fail('ip route show failed')
  513. output = output.decode()
  514. self.assertIn('192.168.1.128', output)
  515. self.assertNotIn(str(self.proxy.ip), output)
  516. def test_210_custom_ip_simple(self):
  517. '''Custom AppVM IP
  518. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  519. '''
  520. self.testvm1.ip = '192.168.1.1'
  521. self.app.save()
  522. self.loop.run_until_complete(self.testvm1.start())
  523. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  524. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  525. def test_211_custom_ip_proxy(self):
  526. '''Custom ProxyVM IP
  527. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  528. '''
  529. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  530. name=self.make_vm_name('proxy'),
  531. label='red')
  532. self.loop.run_until_complete(self.proxy.create_on_disk())
  533. self.proxy.provides_network = True
  534. self.proxy.netvm = self.testnetvm
  535. self.proxy.ip = '192.168.1.1'
  536. self.testvm1.netvm = self.proxy
  537. self.app.save()
  538. self.loop.run_until_complete(self.testvm1.start())
  539. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0)
  540. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0)
  541. def test_212_custom_ip_firewall(self):
  542. '''Custom VM IP and firewall
  543. :type self: qubes.tests.SystemTestCase | VMNetworkingMixin
  544. '''
  545. self.testvm1.ip = '192.168.1.1'
  546. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  547. name=self.make_vm_name('proxy'),
  548. label='red')
  549. self.proxy.provides_network = True
  550. self.loop.run_until_complete(self.proxy.create_on_disk())
  551. self.proxy.netvm = self.testnetvm
  552. self.testvm1.netvm = self.proxy
  553. self.app.save()
  554. # block all but ICMP and DNS
  555. self.testvm1.firewall.rules = [
  556. qubes.firewall.Rule(None, action='accept', proto='icmp'),
  557. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  558. ]
  559. self.testvm1.firewall.save()
  560. self.loop.run_until_complete(self.testvm1.start())
  561. self.assertTrue(self.proxy.is_running())
  562. server = self.loop.run_until_complete(self.testnetvm.run(
  563. 'socat TCP-LISTEN:1234,fork EXEC:/bin/uname'))
  564. try:
  565. self.assertEqual(self.run_cmd(self.proxy, self.ping_ip), 0,
  566. "Ping by IP from ProxyVM failed")
  567. self.assertEqual(self.run_cmd(self.proxy, self.ping_name), 0,
  568. "Ping by name from ProxyVM failed")
  569. self.assertEqual(self.run_cmd(self.testvm1, self.ping_ip), 0,
  570. "Ping by IP should be allowed")
  571. self.assertEqual(self.run_cmd(self.testvm1, self.ping_name), 0,
  572. "Ping by name should be allowed")
  573. client_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
  574. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  575. "TCP connection should be blocked")
  576. finally:
  577. server.terminate()
  578. self.loop.run_until_complete(server.wait())
  579. # noinspection PyAttributeOutsideInit,PyPep8Naming
  580. class VmIPv6NetworkingMixin(VmNetworkingMixin):
  581. test_ip6 = '2000:abcd::1'
  582. ping6_cmd = 'ping6 -W 1 -n -c 1 {target}'
  583. def setUp(self):
  584. super(VmIPv6NetworkingMixin, self).setUp()
  585. self.ping6_ip = self.ping6_cmd.format(target=self.test_ip6)
  586. self.ping6_name = self.ping6_cmd.format(target=self.test_name)
  587. def configure_netvm(self):
  588. '''
  589. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  590. '''
  591. self.testnetvm.features['ipv6'] = True
  592. super(VmIPv6NetworkingMixin, self).configure_netvm()
  593. def run_netvm_cmd(cmd):
  594. if self.run_cmd(self.testnetvm, cmd) != 0:
  595. self.fail("Command '%s' failed" % cmd)
  596. run_netvm_cmd("ip addr add {}/128 dev test0".format(self.test_ip6))
  597. run_netvm_cmd(
  598. "ip6tables -I INPUT -d {} -j ACCEPT".format(self.test_ip6))
  599. # ignore failure
  600. self.run_cmd(self.testnetvm, "killall --wait dnsmasq")
  601. run_netvm_cmd(
  602. "dnsmasq -a {ip} -A /{name}/{ip} -A /{name}/{ip6} -i test0 -z".
  603. format(ip=self.test_ip, ip6=self.test_ip6, name=self.test_name))
  604. def test_500_ipv6_simple_networking(self):
  605. '''
  606. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  607. '''
  608. self.loop.run_until_complete(self.testvm1.start())
  609. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0)
  610. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0)
  611. def test_510_ipv6_simple_proxyvm(self):
  612. '''
  613. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  614. '''
  615. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  616. name=self.make_vm_name('proxy'),
  617. label='red')
  618. self.proxy.provides_network = True
  619. self.proxy.netvm = self.testnetvm
  620. self.loop.run_until_complete(self.proxy.create_on_disk())
  621. self.testvm1.netvm = self.proxy
  622. self.app.save()
  623. self.loop.run_until_complete(self.testvm1.start())
  624. self.assertTrue(self.proxy.is_running())
  625. self.assertEqual(self.run_cmd(self.proxy, self.ping6_ip), 0,
  626. "Ping by IP from ProxyVM failed")
  627. self.assertEqual(self.run_cmd(self.proxy, self.ping6_name), 0,
  628. "Ping by name from ProxyVM failed")
  629. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  630. "Ping by IP from AppVM failed")
  631. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
  632. "Ping by IP from AppVM failed")
  633. @qubes.tests.expectedFailureIfTemplate('debian-7')
  634. @unittest.skipUnless(spawn.find_executable('xdotool'),
  635. "xdotool not installed")
  636. def test_520_ipv6_simple_proxyvm_nm(self):
  637. '''
  638. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  639. '''
  640. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  641. name=self.make_vm_name('proxy'),
  642. label='red')
  643. self.proxy.provides_network = True
  644. self.loop.run_until_complete(self.proxy.create_on_disk())
  645. self.proxy.netvm = self.testnetvm
  646. self.proxy.features['service.network-manager'] = True
  647. self.testvm1.netvm = self.proxy
  648. self.app.save()
  649. self.loop.run_until_complete(self.testvm1.start())
  650. self.assertTrue(self.proxy.is_running())
  651. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  652. "Ping by IP failed")
  653. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
  654. "Ping by name failed")
  655. # reconnect to make sure that device was configured by NM
  656. self.assertEqual(
  657. self.run_cmd(self.proxy, "nmcli device disconnect eth0",
  658. user="user"),
  659. 0, "Failed to disconnect eth0 using nmcli")
  660. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  661. "Network should be disabled, but apparently it isn't")
  662. self.assertEqual(
  663. self.run_cmd(self.proxy,
  664. 'nmcli connection up "VM uplink eth0" ifname eth0',
  665. user="user"),
  666. 0, "Failed to connect eth0 using nmcli")
  667. self.assertEqual(self.run_cmd(self.proxy, "nm-online",
  668. user="user"), 0,
  669. "Failed to wait for NM connection")
  670. # wait for duplicate-address-detection to complete - by default it has
  671. # 1s timeout
  672. time.sleep(2)
  673. # check for nm-applet presence
  674. self.assertEqual(subprocess.call([
  675. 'xdotool', 'search', '--class', '{}:nm-applet'.format(
  676. self.proxy.name)],
  677. stdout=subprocess.DEVNULL), 0, "nm-applet window not found")
  678. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  679. "Ping by IP failed (after NM reconnection")
  680. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
  681. "Ping by name failed (after NM reconnection)")
  682. def test_530_ipv6_firewallvm_firewall(self):
  683. '''
  684. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  685. '''
  686. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  687. name=self.make_vm_name('proxy'),
  688. label='red')
  689. self.proxy.provides_network = True
  690. self.loop.run_until_complete(self.proxy.create_on_disk())
  691. self.proxy.netvm = self.testnetvm
  692. self.testvm1.netvm = self.proxy
  693. self.app.save()
  694. # block all for first
  695. self.testvm1.firewall.rules = [qubes.firewall.Rule(action='drop')]
  696. self.testvm1.firewall.save()
  697. self.loop.run_until_complete(self.testvm1.start())
  698. self.assertTrue(self.proxy.is_running())
  699. server = self.loop.run_until_complete(self.testnetvm.run(
  700. 'socat TCP6-LISTEN:1234,fork EXEC:/bin/uname'))
  701. try:
  702. self.assertEqual(self.run_cmd(self.proxy, self.ping6_ip), 0,
  703. "Ping by IP from ProxyVM failed")
  704. self.assertEqual(self.run_cmd(self.proxy, self.ping6_name), 0,
  705. "Ping by name from ProxyVM failed")
  706. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  707. "Ping by IP should be blocked")
  708. client6_cmd = "socat TCP:[{}]:1234 -".format(self.test_ip6)
  709. client4_cmd = "socat TCP:{}:1234 -".format(self.test_ip)
  710. self.assertNotEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
  711. "TCP connection should be blocked")
  712. # block all except ICMP
  713. self.testvm1.firewall.rules = [(
  714. qubes.firewall.Rule(None, action='accept', proto='icmp')
  715. )]
  716. self.testvm1.firewall.save()
  717. # Ugly hack b/c there is no feedback when the rules are actually
  718. # applied
  719. time.sleep(3)
  720. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  721. "Ping by IP failed (should be allowed now)")
  722. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
  723. "Ping by name should be blocked")
  724. # all TCP still blocked
  725. self.testvm1.firewall.rules = [
  726. qubes.firewall.Rule(None, action='accept', proto='icmp'),
  727. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  728. ]
  729. self.testvm1.firewall.save()
  730. # Ugly hack b/c there is no feedback when the rules are actually
  731. # applied
  732. time.sleep(3)
  733. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
  734. "Ping by name failed (should be allowed now)")
  735. self.assertNotEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
  736. "TCP connection should be blocked")
  737. # block all except target
  738. self.testvm1.firewall.rules = [
  739. qubes.firewall.Rule(None, action='accept',
  740. dsthost=self.test_ip6,
  741. proto='tcp', dstports=1234),
  742. ]
  743. self.testvm1.firewall.save()
  744. # Ugly hack b/c there is no feedback when the rules are actually
  745. # applied
  746. time.sleep(3)
  747. self.assertEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
  748. "TCP connection failed (should be allowed now)")
  749. # block all except target - by name
  750. self.testvm1.firewall.rules = [
  751. qubes.firewall.Rule(None, action='accept',
  752. dsthost=self.test_name,
  753. proto='tcp', dstports=1234),
  754. ]
  755. self.testvm1.firewall.save()
  756. # Ugly hack b/c there is no feedback when the rules are actually
  757. # applied
  758. time.sleep(3)
  759. self.assertEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
  760. "TCP (IPv6) connection failed (should be allowed now)")
  761. self.assertEqual(self.run_cmd(self.testvm1, client4_cmd),
  762. 0,
  763. "TCP (IPv4) connection failed (should be allowed now)")
  764. # allow all except target
  765. self.testvm1.firewall.rules = [
  766. qubes.firewall.Rule(None, action='drop', dsthost=self.test_ip6,
  767. proto='tcp', dstports=1234),
  768. qubes.firewall.Rule(action='accept'),
  769. ]
  770. self.testvm1.firewall.save()
  771. # Ugly hack b/c there is no feedback when the rules are actually
  772. # applied
  773. time.sleep(3)
  774. self.assertNotEqual(self.run_cmd(self.testvm1, client6_cmd), 0,
  775. "TCP connection should be blocked")
  776. finally:
  777. server.terminate()
  778. self.loop.run_until_complete(server.wait())
  779. def test_540_ipv6_inter_vm(self):
  780. '''
  781. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  782. '''
  783. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  784. name=self.make_vm_name('proxy'),
  785. label='red')
  786. self.loop.run_until_complete(self.proxy.create_on_disk())
  787. self.proxy.provides_network = True
  788. self.proxy.netvm = self.testnetvm
  789. self.testvm1.netvm = self.proxy
  790. self.testvm2 = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  791. name=self.make_vm_name('vm2'),
  792. label='red')
  793. self.loop.run_until_complete(self.testvm2.create_on_disk())
  794. self.testvm2.netvm = self.proxy
  795. self.app.save()
  796. self.loop.run_until_complete(asyncio.wait([
  797. self.testvm1.start(),
  798. self.testvm2.start()]))
  799. self.assertNotEqual(self.run_cmd(self.testvm1,
  800. self.ping_cmd.format(target=self.testvm2.ip6)), 0)
  801. self.testvm2.netvm = self.testnetvm
  802. self.assertNotEqual(self.run_cmd(self.testvm1,
  803. self.ping_cmd.format(target=self.testvm2.ip6)), 0)
  804. self.assertNotEqual(self.run_cmd(self.testvm2,
  805. self.ping_cmd.format(target=self.testvm1.ip6)), 0)
  806. self.testvm1.netvm = self.testnetvm
  807. self.assertNotEqual(self.run_cmd(self.testvm1,
  808. self.ping_cmd.format(target=self.testvm2.ip6)), 0)
  809. self.assertNotEqual(self.run_cmd(self.testvm2,
  810. self.ping_cmd.format(target=self.testvm1.ip6)), 0)
  811. def test_550_ipv6_spoof_ip(self):
  812. '''Test if VM IP spoofing is blocked
  813. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  814. '''
  815. self.loop.run_until_complete(self.testvm1.start())
  816. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0)
  817. # add a simple rule counting packets
  818. self.assertEqual(self.run_cmd(self.testnetvm,
  819. 'ip6tables -I INPUT -i vif+ ! -s {} -p icmpv6 -j LOG'.format(
  820. self.testvm1.ip6)), 0)
  821. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  822. 'ip -6 addr flush dev eth0 && '
  823. 'ip -6 addr add {}/128 dev eth0 && '
  824. 'ip -6 route add default via {} dev eth0'.format(
  825. str(self.testvm1.visible_ip6) + '1',
  826. str(self.testvm1.visible_gateway6)),
  827. user='root'))
  828. self.assertNotEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  829. "Spoofed ping should be blocked")
  830. try:
  831. (output, _) = self.loop.run_until_complete(
  832. self.testnetvm.run_for_stdio('ip6tables -nxvL INPUT',
  833. user='root'))
  834. except subprocess.CalledProcessError:
  835. self.fail('ip6tables -nxvL INPUT failed')
  836. output = output.decode().splitlines()
  837. packets = output[2].lstrip().split()[0]
  838. self.assertEquals(packets, '0', 'Some packet hit the INPUT rule')
  839. def test_710_ipv6_custom_ip_simple(self):
  840. '''Custom AppVM IP
  841. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  842. '''
  843. self.testvm1.ip6 = '2000:aaaa:bbbb::1'
  844. self.app.save()
  845. self.loop.run_until_complete(self.testvm1.start())
  846. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0)
  847. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0)
  848. def test_711_ipv6_custom_ip_proxy(self):
  849. '''Custom ProxyVM IP
  850. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  851. '''
  852. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  853. name=self.make_vm_name('proxy'),
  854. label='red')
  855. self.loop.run_until_complete(self.proxy.create_on_disk())
  856. self.proxy.provides_network = True
  857. self.proxy.netvm = self.testnetvm
  858. self.testvm1.ip6 = '2000:aaaa:bbbb::1'
  859. self.testvm1.netvm = self.proxy
  860. self.app.save()
  861. self.loop.run_until_complete(self.testvm1.start())
  862. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0)
  863. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0)
  864. def test_712_ipv6_custom_ip_firewall(self):
  865. '''Custom VM IP and firewall
  866. :type self: qubes.tests.SystemTestCase | VmIPv6NetworkingMixin
  867. '''
  868. self.testvm1.ip6 = '2000:aaaa:bbbb::1'
  869. self.proxy = self.app.add_new_vm(qubes.vm.appvm.AppVM,
  870. name=self.make_vm_name('proxy'),
  871. label='red')
  872. self.proxy.provides_network = True
  873. self.loop.run_until_complete(self.proxy.create_on_disk())
  874. self.proxy.netvm = self.testnetvm
  875. self.testvm1.netvm = self.proxy
  876. self.app.save()
  877. # block all but ICMP and DNS
  878. self.testvm1.firewall.rules = [
  879. qubes.firewall.Rule(None, action='accept', proto='icmp'),
  880. qubes.firewall.Rule(None, action='accept', specialtarget='dns'),
  881. ]
  882. self.testvm1.firewall.save()
  883. self.loop.run_until_complete(self.testvm1.start())
  884. self.assertTrue(self.proxy.is_running())
  885. server = self.loop.run_until_complete(self.testnetvm.run(
  886. 'socat TCP6-LISTEN:1234,fork EXEC:/bin/uname'))
  887. try:
  888. self.assertEqual(self.run_cmd(self.proxy, self.ping6_ip), 0,
  889. "Ping by IP from ProxyVM failed")
  890. self.assertEqual(self.run_cmd(self.proxy, self.ping6_name), 0,
  891. "Ping by name from ProxyVM failed")
  892. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_ip), 0,
  893. "Ping by IP should be allowed")
  894. self.assertEqual(self.run_cmd(self.testvm1, self.ping6_name), 0,
  895. "Ping by name should be allowed")
  896. client_cmd = "socat TCP:[{}]:1234 -".format(self.test_ip6)
  897. self.assertNotEqual(self.run_cmd(self.testvm1, client_cmd), 0,
  898. "TCP connection should be blocked")
  899. finally:
  900. server.terminate()
  901. self.loop.run_until_complete(server.wait())
  902. # noinspection PyAttributeOutsideInit,PyPep8Naming
  903. class VmUpdatesMixin(object):
  904. """
  905. Tests for VM updates
  906. """
  907. # filled by load_tests
  908. template = None
  909. # made this way to work also when no package build tools are installed
  910. """
  911. $ cat test-pkg.spec:
  912. Name: test-pkg
  913. Version: 1.0
  914. Release: 1%{?dist}
  915. Summary: Test package
  916. Group: System
  917. License: GPL
  918. URL: http://example.com/
  919. %description
  920. Test package
  921. %files
  922. %changelog
  923. $ rpmbuild -bb test-pkg.spec
  924. $ cat test-pkg-1.0-1.fc21.x86_64.rpm | gzip | base64
  925. $ cat test-pkg-1.1-1.fc21.x86_64.rpm | gzip | base64
  926. """
  927. RPM_PACKAGE_GZIP_BASE64 = [(
  928. b"H4sIAPzRLlYAA+2Y728URRjHn7ueUCkERKJVJDnTxLSxs7293o8WOER6ljYYrtKCLUSa3"
  929. b"bnZ64bd22VmTq8nr4wJbwxvjNHIG0x8oTHGGCHB8AcYE1/0lS80GgmQFCJU3wgB4ZjdfZ"
  930. b"q2xDe8NNlvMjfzmeeZH7tPbl98b35169cOUEpIJiTxT9SIrmVUs2hWh8dUAp54dOrM14s"
  931. b"JHK4D2DKl+j2qrVfjsuq3qEWbohjuAB2Lqk+p1o/8Z5QPmSi/YwnjezH+F8bLQZjqllW0"
  932. b"hvODRmFIL5hFk9JMXi/mi5ZuDleNwSEzP5wtmLnouNQnm3/6fndz7FLt9M/Hruj37gav4"
  933. b"tTjPnasWLFixYoVK1asWLFixYoV63+p0KNot9vnIPQc1vgYOwCSgXfxCoS+QzKHOVXVOj"
  934. b"Fn2ccIfI0k8nXkLuQbyJthxed4UrVnkG8i9yDfgsj3yCAv4foc8t+w1hf5B+Nl5Du43xj"
  935. b"yvxivIN9HpsgPkO2IU9uQfeRn8Xk/iJ4x1Y3nfxH1qecwfhH5+YgT25F7o/0SRdxvOppP"
  936. b"7MX9ZjB/DNnE/OOYX404uRGZIT+FbCFvQ3aQ8f0+/WF0XjJ8nyOw7H+BrmUA/a8pNZf2D"
  937. b"XrCqLG1cERbWHI8ajhznpBY9P0Tr8PkvJDMhTkp/Z0DA6xpuL7DNOq5A+DY9UYTmkOF2U"
  938. b"IO/sNt0wSnGvfdlZssD3rVIlLI9UUX37C6qXzHNntHPNfnTAhWHbUddtBwmegDjAUzZbu"
  939. b"m9lqZmzDmHc8Ik8WY8Tab4Myym4+Gx8V0qw8GtYyWIzrktEJwV9UHv3ktG471rAqHTmFQ"
  940. b"685V5uGqIalk06SWJr7tszR503Ac9cs493jJ8rhrSCIYbXBbzqt5v5+UZ0crh6bGR2dmJ"
  941. b"yuHD428VlLLLdakzJe2VxcKhFSFID73JKPS40RI7tXVCcQ3uOGWhPCJ2bAspiJ2i5Vy6n"
  942. b"jOqMerpEYpEe/Yks4xkU4Tt6BirmzUWanG6ozbFKhve9BsQRaLRTirzqk7hgUktXojKnf"
  943. b"n8jeg3X4QepP3i63po6oml+9t/CwJLya2Bn/ei6f7/4B3Ycdb0L3pt5Q5mNz16rWJ9fLk"
  944. b"vvOff/nxS7//8O2P2gvt7nDDnoV9L1du9N4+ucjl9u/8+a7dC5Nnvjlv9Ox5r+v9Cy0NE"
  945. b"m+c6rv60S/dZw98Gn6MNswcfQiWUvg3wBUAAA=="
  946. ), (
  947. b"H4sIAMY1B1wCA+2Y72scRRjH537U1MZorKLRVjgJSIKdvf11u3dq0jZJ0wRLL+1VvBRrn"
  948. b"J2dvVu6t7vd3bN3sS9ECr6RIkgR9JXQF5aiIk1L/gJF8EVe+KqiKLQQi03tmypojXO3zz"
  949. b"Vp8YW+3y/MPvOZ55lnZthhXjw3Lqx9n0FcqYiFEfaP17AkSLxZVJbQ/1QKbbl/6Mxnqyn"
  950. b"o9iE0uMTtOPTPcTvIJw1w+8DdDCj1KPBozJlVbrO8OcC/xvORH8/P3AT/2+D/DfynuTtH"
  951. b"FKtANKNo6KpKJIs3jSkl3VIkvSAWSiZTlCL3akhXZCKKKmPcaRRJqURFS2MlSTMsgyqMz"
  952. b"6RUp8SQFcmixZJpMlEWi0w1da3Eu3p3+1uW1saPHfpWOSvNXtruDVx4+A0+eAolSpQoUa"
  953. b"JEiRIlSpQoUaJEiRJBTWR9ff191K1p3FM3ySGUEbndjbp1jUwOYkzetkJMr07SqZukgX8"
  954. b"B7ge+DvwI2qijPMjbE8A3gIeB11BcVxGBb8J8FfgW+PcA3wb/FPAfkG8G+C/wl4HvAFPg"
  955. b"v4HtmLOPA/vAT8J534vPmB2C9T+NbfYp8C8DPx1zagfwSJwvpUO+ajye2gP55iF+BtiA+"
  956. b"Nch3ow5/TkwA74IbAFfBnaAl2N+7IN4vfQK8Ffg/w74arx++grwtTg+s7PDk6hXn0OSIC"
  957. b"Gozx3hYzmf0OOkxu6F1/oKlx2PEqfuhRFckv1zB1ClHUasgepR5L+Qz7MWafgOE6jXyCP"
  958. b"Hdpst1CpqC5qK/qUaKIQBFQK/sbGTXmeET8KaCgW7bZsbj3dsY2TSa/gBC0NmTtsOO0ga"
  959. b"LBxF4OuMTNk1nmtjbI60HY90g8MZ8iabC5hlt+53z4bVxVGkCKKgYgmpgiaIXdv5FgS52"
  960. b"5dUQY6P37kbWzcVNzd1cVnO4VoO+7bPcvhV4jj8y4LAC8YsL2iQCIeMNgM7avNxfxeeWp"
  961. b"guHz4yOz2/UCm/cnhy35jcG99/YHZislpd2Fup7OMR5YOVHLZYizI/sj035BBG/BdhP/A"
  962. b"iRiMvwGEUeC5fuxYw6gUmrlGKw5N2ROuMh4c+o+FYvhkGeX7wPD9/PmBmnURgcJ0EJnOZ"
  963. b"iSmV/kM4cV3PsN04uqGp/BM1XTZW4zkCm/L9kbDt0jrfk9cMcdM9absmjojhsI3NU4eE9"
  964. b"d4R+LG4g1qbGFHf9lBrEclwnTCs3r1iuOY2u/+jGVm4iCwiyXpJE61SkUq6RhVW0FVFpo"
  965. b"ZZ0oiu6ppuFSxSFBXTUOQCFRmhhElFQ9XNgiyJhbv/dnf8hnaeETR4R1+sHuX37+c/H/o"
  966. b"kjZ5Nbe88bMvv7voJvYWeOYaGBn7IGkr6xb3X5vqiExNL585/+NyPX3/5jbBzfaibcHhl"
  967. b"4vny9ZHfT6wG0Y6Lfrv/pZXKmS+WyPD4O/2nLy0KKHXo1OjVs1eGPn75o+5DvW3+6D9jd"
  968. b"bFaTBcAAA=="
  969. )]
  970. """
  971. Minimal package generated by running dh_make on empty directory
  972. Then cat test-pkg_1.0-1_amd64.deb | gzip | base64
  973. Then cat test-pkg_1.1-1_amd64.deb | gzip | base64
  974. """
  975. DEB_PACKAGE_GZIP_BASE64 = [(
  976. b"H4sIACTXLlYAA1O0SSxKzrDjSklNykzM003KzEssqlRQUDA0MTG1NDQwNDVTUDBQAAEIa"
  977. b"WhgYGZioqBgogADCVxGegZcyfl5JUX5OXoliUV66VVE6DcwheuX7+ZgAAEW5rdXHb0PG4"
  978. b"iwf5j3WfMT6zWzzMuZgoE3jjYraNzbbFKWGms0SaRw/r2SV23WZ4IdP8preM4yqf0jt95"
  979. b"3c8qnacfNxJUkf9/w+/3X9ph2GEdgQdixrz/niHKKTnYXizf4oSC7tHOz2Zzq+/6vn8/7"
  980. b"ezQ7c1tmi7xZ3SGJ4yzhT2dcr7V+W3zM5ZPu/56PSv4Zdok+7Yv/V/6buWaKVlFkkV58S"
  981. b"N3GmLgnqzRmeZ3V3ymmurS5fGa85/LNx1bpZMin3S6dvXKqydp3ubP1vmyarJZb/qSh62"
  982. b"C8oIdxqm/BtvkGDza+On/Vfv2py7/0LV7VH+qR6a+bkKUbHXt5/SG187d+nps1a5PJfMO"
  983. b"i11dWcUe1HjwaW3Q5RHXn9LmcHy+tW9YcKf0768XVB1t3R0bKrzs5t9P+6r7rZ99svH10"
  984. b"+Q6F/o8tf1fO/32y+fWa14eifd+WxUy0jcxYH7N9/tUvmnUZL74pW32qLeuRU+ZwYGASa"
  985. b"GBgUWBgxM90ayy3VdmykkGDgYErJbEkERydFVWQmCMQo8aWZvAY/WteFRHFwMCYqXTPjI"
  986. b"lBkVEMGLsl+k8XP1D/z+gXyyDOvUemlnHqAVkvu0rRQ2fUFodkN3mtU9uwhqk8V+TqPEE"
  987. b"Nc7fzoQ4n71lqRs/7kbbT0+qOZuKH4r8mjzsc1k/YkCHN8Pjg48fbpE+teHa96LNcfu0V"
  988. b"5n2/Z2xa2KDvaCOx8cqBFxc514uZ3TmadXS+6cpzU7wSzq5SWfapJOD9n6wLXSwtlgxZh"
  989. b"xITzWW7buhx/bb291RcVlEfeC9K5hlrqunSzIMSZT7/Nqgc/qMvMNW227WI8ezB8mVuZh"
  990. b"0hERJSvysfburr4Dx0I9BW57UwR4+e1gxu49PcEt8sbK18Xpvt//Hj5UYm+Zc25q+T4xl"
  991. b"rJvxfVnh80oadq57OZxPaU1bbztv1yF365W4t45Yr+XrFzov237GVY1Zgf7NvE4+W2SuR"
  992. b"lQtLauR1TQ/mbOiIONYya6tU1jPGpWfk/i1+ttiXe3ZO14n0YOWggndznjGlGLyfVbBC6"
  993. b"MRP5aMM7aCco/s7sZqB8RlTQwADw8rnuT/sDHi7mUASjJFRAAbWwNLiAwAA"
  994. ), (
  995. b"H4sIAL05B1wCA1O0SSxKzrDjSklNykzM003KzEssqlRQUDA0NTG2NDc3NjdTUDBQAAEIa"
  996. b"WhgYGZioqBgogADCVxGegZcyfl5JUX5OXoliUV66VVE6De3gOuX7+ZgAAEW5rdXzmbdMR"
  997. b"BgSJj/VeQzQ+ztT/W+EVEnFraKOTlXh6+JXB8RbTRpzgWb2qdLX0+RmTRZcYlyxJutJsk"
  998. b"/pfsfq9yqWZJ4JVVS97jBPPnz1yviluw51b0q4tnrWemCU2a/17mTUBYX0XBC6nH8rvvZ"
  999. b"n/WP7nu40+Jlz7drPNLvCjULQkXOv677OV9s4bPsv5+tvCzPG8s57no479qV/5V/813Kh"
  1000. b"Wy3Pbj4827Jq5v6W/wk7zL1/+zbfH6btVb/3Pm5EapukaJvdgfcape/JZZWe+mZ4+Grby"
  1001. b"7UTaroPzyv9urC1W2MT9+F2bZtWJOyXfGo5dv7DGXJUzee+p930Od0j8QNceNHJffOTr2"
  1002. b"kOJe93mWG+nPdLsG6fz++MV5h1OGr0N9yf3N2ydzQ5x/E9Aw/s9xzmOpULnKtsSZqc/rr"
  1003. b"RQdf/Lu/ckKE9xU5VRuNehbzTr6789a+P2lt2zk5cFqe3N2289+j/hfH2X39/+nvc5vTW"
  1004. b"a/+83pvWqY3e93JWYsmup693HzCOPBk0LI9O7PtiqawN9y8eaTV75DLLL2dNWqTLsTsOn"
  1005. b"7wy0fTe5oLH//7eNf89Co3dRUHJmLRh20s/xhYJkoeYdBgYEhJLEkEJ4uKKkgKIJQyjI3"
  1006. b"gKeOveVVEFAMDY6bSPTMmBkVGMWAqKdF/uviB+n/GwlgGce49MrWMUw/IetlVih46o7Y4"
  1007. b"0uZe/t9lt85aMUrdWhjueTHRd1nr1uK830feH74vcPKU2pkbP4SZnta5PhC9dfPTqvv7f"
  1008. b"n068XRDRDzLuv8Oa5p1L+02ZN127vp6mzSzzFqpLkmbwyl131J1xW58YlcxXSWs0PTbpT"
  1009. b"z28ZUnE/e+NN93weAd40a/zzJ7+Re/v+R7+f3VBVFJCyZsv523ySJ12t7Nt5b8uBu8zuJ"
  1010. b"2Laer//nZCkbXlxtYXvvA8+VSVsCRpo8BawtftKWyZBjkWa6/0X7qXfbF9reH/ro6S63Y"
  1011. b"rCj8t8cltPIOj9H/8LyIxj6bMsZVVtu+ngj6MCNV5JXhOs07RXWxrb3xsqJMDRksx/5bO"
  1012. b"bNtevXz2cdpzzI19Roede4NXxAyK9Dlrtp8JtELLNPWbBe9HfJlj1Hiv69erIFBnX/Pe1"
  1013. b"4QnzLD+p2AiTc383/P+7sW3WoxnXra49iJKJeZy7gc9Z02S57qrvWW3day501VhsbPtfK"
  1014. b"C5nyBG9qjr08E59KY1vUTGRg7mRsCGBimFa+3sTPg7WYCSTBGRgEAzEOeH04EAAA="
  1015. )]
  1016. def run_cmd(self, vm, cmd, user="root"):
  1017. '''Run a command *cmd* in a *vm* as *user*. Return its exit code.
  1018. :type self: qubes.tests.SystemTestCase | VmUpdatesMixin
  1019. :param qubes.vm.qubesvm.QubesVM vm: VM object to run command in
  1020. :param str cmd: command to execute
  1021. :param std user: user to execute command as
  1022. :return int: command exit code
  1023. '''
  1024. try:
  1025. self.loop.run_until_complete(vm.run_for_stdio(cmd))
  1026. except subprocess.CalledProcessError as e:
  1027. return e.returncode
  1028. return 0
  1029. def assertRunCommandReturnCode(self, vm, cmd, expected_returncode):
  1030. p = self.loop.run_until_complete(
  1031. vm.run(cmd, user='root',
  1032. stdout=subprocess.PIPE, stderr=subprocess.PIPE))
  1033. (stdout, stderr) = self.loop.run_until_complete(p.communicate())
  1034. self.assertIn(
  1035. self.loop.run_until_complete(p.wait()), expected_returncode,
  1036. '{}: {}\n{}'.format(cmd, stdout, stderr))
  1037. def setUp(self):
  1038. '''
  1039. :type self: qubes.tests.SystemTestCase | VmUpdatesMixin
  1040. '''
  1041. if not self.template.count('debian') and \
  1042. not self.template.count('fedora'):
  1043. self.skipTest("Template {} not supported by this test".format(
  1044. self.template))
  1045. super(VmUpdatesMixin, self).setUp()
  1046. self.update_cmd = None
  1047. if self.template.count("debian"):
  1048. self.update_cmd = "set -o pipefail; apt-get update 2>&1 | " \
  1049. "{ ! grep '^W:\|^E:'; }"
  1050. self.upgrade_cmd = "apt-get -V dist-upgrade -y"
  1051. self.install_cmd = "apt-get install -y {}"
  1052. self.install_test_cmd = "dpkg -l {}"
  1053. self.exit_code_ok = [0]
  1054. elif self.template.count("fedora"):
  1055. cmd = "yum"
  1056. try:
  1057. # assume template name in form "fedora-XX-suffix"
  1058. if int(self.template.split("-")[1]) > 21:
  1059. cmd = "dnf"
  1060. except ValueError:
  1061. pass
  1062. self.update_cmd = "{cmd} clean all; {cmd} check-update".format(
  1063. cmd=cmd)
  1064. self.upgrade_cmd = "{cmd} upgrade -y".format(cmd=cmd)
  1065. self.install_cmd = cmd + " install -y {}"
  1066. self.install_test_cmd = "rpm -q {}"
  1067. self.exit_code_ok = [0, 100]
  1068. self.init_default_template(self.template)
  1069. self.init_networking()
  1070. self.testvm1 = self.app.add_new_vm(
  1071. qubes.vm.appvm.AppVM,
  1072. name=self.make_vm_name('vm1'),
  1073. label='red')
  1074. self.loop.run_until_complete(self.testvm1.create_on_disk())
  1075. def test_000_simple_update(self):
  1076. '''
  1077. :type self: qubes.tests.SystemTestCase | VmUpdatesMixin
  1078. '''
  1079. self.app.save()
  1080. self.testvm1 = self.app.domains[self.testvm1.qid]
  1081. self.loop.run_until_complete(self.testvm1.start())
  1082. self.assertRunCommandReturnCode(self.testvm1,
  1083. self.update_cmd, self.exit_code_ok)
  1084. def create_repo_apt(self, version=0):
  1085. '''
  1086. :type self: qubes.tests.SystemTestCase | VmUpdatesMixin
  1087. '''
  1088. pkg_file_name = "test-pkg_1.{}-1_amd64.deb".format(version)
  1089. self.loop.run_until_complete(self.netvm_repo.run_for_stdio('''
  1090. mkdir -p /tmp/apt-repo \
  1091. && cd /tmp/apt-repo \
  1092. && base64 -d | zcat > {}
  1093. '''.format(pkg_file_name),
  1094. input=self.DEB_PACKAGE_GZIP_BASE64[version]))
  1095. # do not assume dpkg-scanpackage installed
  1096. packages_path = "dists/test/main/binary-amd64/Packages"
  1097. self.loop.run_until_complete(self.netvm_repo.run_for_stdio('''
  1098. mkdir -p /tmp/apt-repo/dists/test/main/binary-amd64 \
  1099. && cd /tmp/apt-repo \
  1100. && cat > {packages} \
  1101. && echo MD5sum: $(openssl md5 -r {pkg} | cut -f 1 -d ' ') \
  1102. >> {packages} \
  1103. && echo SHA1: $(openssl sha1 -r {pkg} | cut -f 1 -d ' ') \
  1104. >> {packages} \
  1105. && echo SHA256: $(openssl sha256 -r {pkg} | cut -f 1 -d ' ') \
  1106. >> {packages} \
  1107. && sed -i -e "s,@SIZE@,$(stat -c %s {pkg})," {packages} \
  1108. && gzip < {packages} > {packages}.gz
  1109. '''.format(pkg=pkg_file_name, packages=packages_path),
  1110. input='''\
  1111. Package: test-pkg
  1112. Version: 1.{version}-1
  1113. Architecture: amd64
  1114. Maintainer: unknown <user@host>
  1115. Installed-Size: 25
  1116. Filename: {pkg}
  1117. Size: @SIZE@
  1118. Section: unknown
  1119. Priority: optional
  1120. Description: Test package'''.format(pkg=pkg_file_name, version=version).encode(
  1121. 'utf-8')))
  1122. self.loop.run_until_complete(self.netvm_repo.run_for_stdio('''
  1123. mkdir -p /tmp/apt-repo/dists/test \
  1124. && cd /tmp/apt-repo/dists/test \
  1125. && cat > Release \
  1126. && echo '' $(sha256sum {p} | cut -f 1 -d ' ') $(stat -c %s {p}) {p}\
  1127. >> Release \
  1128. && echo '' $(sha256sum {z} | cut -f 1 -d ' ') $(stat -c %s {z}) {z}\
  1129. >> Release
  1130. '''.format(p='main/binary-amd64/Packages',
  1131. z='main/binary-amd64/Packages.gz'),
  1132. input=b'''\
  1133. Label: Test repo
  1134. Suite: test
  1135. Codename: test
  1136. Date: Tue, 27 Oct 2015 03:22:09 UTC
  1137. Architectures: amd64
  1138. Components: main
  1139. SHA256:
  1140. '''))
  1141. def create_repo_yum(self, version=0):
  1142. '''
  1143. :type self: qubes.tests.SystemTestCase | VmUpdatesMixin
  1144. '''
  1145. pkg_file_name = "test-pkg-1.{}-1.fc21.x86_64.rpm".format(version)
  1146. self.loop.run_until_complete(self.netvm_repo.run_for_stdio('''
  1147. mkdir -p /tmp/yum-repo \
  1148. && cd /tmp/yum-repo \
  1149. && base64 -d | zcat > {}
  1150. '''.format(pkg_file_name), input=self.RPM_PACKAGE_GZIP_BASE64[
  1151. version]))
  1152. # createrepo is installed by default in Fedora template
  1153. self.loop.run_until_complete(self.netvm_repo.run_for_stdio(
  1154. 'createrepo /tmp/yum-repo'))
  1155. def create_repo_and_serve(self):
  1156. '''
  1157. :type self: qubes.tests.SystemTestCase | VmUpdatesMixin
  1158. '''
  1159. if self.template.count("debian") or self.template.count("whonix"):
  1160. self.create_repo_apt()
  1161. self.loop.run_until_complete(self.netvm_repo.run(
  1162. 'cd /tmp/apt-repo && python -m SimpleHTTPServer 8080',
  1163. stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL))
  1164. elif self.template.count("fedora"):
  1165. self.create_repo_yum()
  1166. self.loop.run_until_complete(self.netvm_repo.run(
  1167. 'cd /tmp/yum-repo && python -m SimpleHTTPServer 8080',
  1168. stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL))
  1169. else:
  1170. # not reachable...
  1171. self.skipTest("Template {} not supported by this test".format(
  1172. self.template))
  1173. def add_update_to_repo(self):
  1174. if self.template.count("debian") or self.template.count("whonix"):
  1175. self.create_repo_apt(1)
  1176. elif self.template.count("fedora"):
  1177. self.create_repo_yum(1)
  1178. def configure_test_repo(self):
  1179. """
  1180. Configure test repository in test-vm and disable rest of them.
  1181. The critical part is to use "localhost" - this will work only when
  1182. accessed through update proxy and this is exactly what we want to
  1183. test here.
  1184. :type self: qubes.tests.SystemTestCase | VmUpdatesMixin
  1185. """
  1186. if self.template.count("debian") or self.template.count("whonix"):
  1187. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  1188. "rm -f /etc/apt/sources.list.d/* &&"
  1189. "echo 'deb [trusted=yes] http://localhost:8080 test main' "
  1190. "> /etc/apt/sources.list",
  1191. user="root"))
  1192. elif self.template.count("fedora"):
  1193. self.loop.run_until_complete(self.testvm1.run_for_stdio(
  1194. "rm -f /etc/yum.repos.d/*.repo &&"
  1195. "echo '[test]' > /etc/yum.repos.d/test.repo &&"
  1196. "echo 'name=Test repo' >> /etc/yum.repos.d/test.repo &&"
  1197. "echo 'gpgcheck=0' >> /etc/yum.repos.d/test.repo &&"
  1198. "echo 'baseurl=http://localhost:8080/'"
  1199. " >> /etc/yum.repos.d/test.repo",
  1200. user="root"
  1201. ))
  1202. else:
  1203. # not reachable...
  1204. self.skipTest("Template {} not supported by this test".format(
  1205. self.template))
  1206. def test_010_update_via_proxy(self):
  1207. '''
  1208. Test both whether updates proxy works and whether is actually used
  1209. by the VM
  1210. :type self: qubes.tests.SystemTestCase | VmUpdatesMixin
  1211. '''
  1212. if self.template.count("minimal"):
  1213. self.skipTest("Template {} not supported by this test".format(
  1214. self.template))
  1215. self.netvm_repo = self.app.add_new_vm(
  1216. qubes.vm.appvm.AppVM,
  1217. name=self.make_vm_name('net'),
  1218. label='red')
  1219. self.netvm_repo.provides_network = True
  1220. self.loop.run_until_complete(self.netvm_repo.create_on_disk())
  1221. self.testvm1.netvm = self.netvm_repo
  1222. self.netvm_repo.features['service.qubes-updates-proxy'] = True
  1223. # TODO: consider also adding a test for the template itself
  1224. self.testvm1.features['service.updates-proxy-setup'] = True
  1225. self.app.save()
  1226. # Setup test repo
  1227. self.loop.run_until_complete(self.netvm_repo.start())
  1228. self.create_repo_and_serve()
  1229. # Configure local repo
  1230. self.loop.run_until_complete(self.testvm1.start())
  1231. self.configure_test_repo()
  1232. with self.qrexec_policy('qubes.UpdatesProxy', self.testvm1,
  1233. '$default', action='allow,target=' + self.netvm_repo.name):
  1234. # update repository metadata
  1235. self.assertRunCommandReturnCode(self.testvm1,
  1236. self.update_cmd, self.exit_code_ok)
  1237. # install test package
  1238. self.assertRunCommandReturnCode(self.testvm1,
  1239. self.install_cmd.format('test-pkg'), self.exit_code_ok)
  1240. # verify if it was really installed
  1241. self.assertRunCommandReturnCode(self.testvm1,
  1242. self.install_test_cmd.format('test-pkg'), self.exit_code_ok)
  1243. def test_020_updates_available_notification(self):
  1244. # override with StandaloneVM
  1245. self.testvm1 = self.app.add_new_vm(
  1246. qubes.vm.standalonevm.StandaloneVM,
  1247. name=self.make_vm_name('vm2'),
  1248. label='red')
  1249. tpl = self.app.domains[self.template]
  1250. self.testvm1.clone_properties(tpl)
  1251. self.testvm1.features.update(tpl.features)
  1252. self.loop.run_until_complete(
  1253. self.testvm1.clone_disk_files(tpl))
  1254. self.loop.run_until_complete(self.testvm1.start())
  1255. self.netvm_repo = self.testvm1
  1256. self.create_repo_and_serve()
  1257. self.configure_test_repo()
  1258. self.loop.run_until_complete(
  1259. self.testvm1.run_for_stdio(
  1260. '/usr/lib/qubes/upgrades-status-notify',
  1261. user='root',
  1262. ))
  1263. self.assertFalse(self.testvm1.features.get('updates-available', False))
  1264. # update repository metadata
  1265. self.assertRunCommandReturnCode(
  1266. self.testvm1, self.update_cmd, self.exit_code_ok)
  1267. # install test package
  1268. self.assertRunCommandReturnCode(
  1269. self.testvm1, self.install_cmd.format('test-pkg'), self.exit_code_ok)
  1270. self.assertFalse(self.testvm1.features.get('updates-available', False))
  1271. self.add_update_to_repo()
  1272. # update repository metadata
  1273. self.assertRunCommandReturnCode(
  1274. self.testvm1, self.update_cmd, self.exit_code_ok)
  1275. self.loop.run_until_complete(
  1276. self.testvm1.run_for_stdio(
  1277. '/usr/lib/qubes/upgrades-status-notify',
  1278. user='root',
  1279. ))
  1280. self.assertTrue(self.testvm1.features.get('updates-available', False))
  1281. # install updates
  1282. self.assertRunCommandReturnCode(
  1283. self.testvm1, self.upgrade_cmd, self.exit_code_ok)
  1284. self.assertFalse(self.testvm1.features.get('updates-available', False))
  1285. def create_testcases_for_templates():
  1286. yield from qubes.tests.create_testcases_for_templates('VmNetworking',
  1287. VmNetworkingMixin, qubes.tests.SystemTestCase,
  1288. module=sys.modules[__name__])
  1289. yield from qubes.tests.create_testcases_for_templates('VmIPv6Networking',
  1290. VmIPv6NetworkingMixin, qubes.tests.SystemTestCase,
  1291. module=sys.modules[__name__])
  1292. yield from qubes.tests.create_testcases_for_templates('VmUpdates',
  1293. VmUpdatesMixin, qubes.tests.SystemTestCase,
  1294. module=sys.modules[__name__])
  1295. def load_tests(loader, tests, pattern):
  1296. tests.addTests(loader.loadTestsFromNames(
  1297. create_testcases_for_templates()))
  1298. return tests
  1299. qubes.tests.maybe_create_testcases_on_import(create_testcases_for_templates)